Spaces:
Running
Running
import math | |
import asyncio | |
import datetime | |
from pyrogram import filters, Client, raw, types,enums | |
from pyrogram.errors import FloodWait | |
from pyrogram.raw import functions | |
from pyrogram.raw.types import Poll,PollAnswer, InputMediaPoll | |
#from pyrogram.raw.base import PollAnswer, Poll | |
from pyrogram.enums.parse_mode import ParseMode | |
from pyrogram.file_id import FileId, FileType, PHOTO_TYPES | |
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery, WebAppInfo | |
from pyrogram.raw.types import KeyboardButtonSimpleWebView | |
#-------------------------Local Imports -----------------------------------# | |
from FileStream import __version__ | |
from FileStream.Database import Database | |
from FileStream.config import Telegram, Server | |
from FileStream.bot import FileStream | |
from FileStream.bot import MULTI_CLIENTS | |
from FileStream.Exceptions import FileNotFound # Corrected the import statement | |
from FileStream.utils.FileProcessors.translation import LANG, BUTTON | |
from FileStream.utils.FileProcessors.human_readable import humanbytes | |
from FileStream.bot.plugins.FileHandlers.stream import private_receive_handler | |
from FileStream.utils.FileProcessors.file_properties import get_file_ids, get_file_info,get_name | |
from FileStream.utils.FileProcessors.bot_utils import gen_link, priv_func, gen_priv_file_link | |
#-----------------Starting Point --------------------------# | |
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) | |
#---------------------[ START CMD ]---------------------# | |
async def cb_data(bot: Client, update: CallbackQuery): | |
usr_cmd = update.data.split("_") | |
if usr_cmd[0] == "home": | |
await update.message.edit_text(text=LANG.START_TEXT.format( | |
update.from_user.mention, FileStream.username), | |
disable_web_page_preview=True, | |
reply_markup=BUTTON.START_BUTTONS) | |
elif usr_cmd[0] == "help": | |
await update.message.edit_text(text=LANG.HELP_TEXT.format( | |
Telegram.OWNER_ID), | |
disable_web_page_preview=True, | |
reply_markup=BUTTON.HELP_BUTTONS) | |
elif usr_cmd[0] == "about": | |
await update.message.edit_text(text=LANG.ABOUT_TEXT.format( | |
FileStream.fname, __version__), | |
disable_web_page_preview=True, | |
reply_markup=BUTTON.ABOUT_BUTTONS) | |
#---------------------[ MY FILES CMD ]---------------------# | |
elif usr_cmd[0] == "N/A": | |
await update.answer("N/A", True) | |
elif usr_cmd[0] == "close": | |
await update.message.delete() | |
elif usr_cmd[0] == "back": | |
try: | |
user_id = str(usr_cmd[1]) | |
message_id = int(usr_cmd[2]) | |
print(user_id, message_id) | |
message = await FileStream.get_messages(user_id, message_id) | |
await private_receive_handler(FileStream, message) | |
except FloodWait as e: | |
print(f"Sleeping for {str(e.value)}s") | |
await asyncio.sleep(e.value) | |
await FileStream.send_message( | |
chat_id=Telegram.ULOG_GROUP, | |
text= | |
f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`", | |
disable_web_page_preview=True, | |
parse_mode=ParseMode.MARKDOWN) | |
elif usr_cmd[0] == "msgdelete": | |
await update.message.edit_caption( | |
caption="**Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ**\n\n", | |
reply_markup=InlineKeyboardMarkup([[ | |
InlineKeyboardButton( | |
"ʏᴇs", callback_data=f"msgdelyes_{usr_cmd[1]}_{usr_cmd[2]}"), | |
InlineKeyboardButton( | |
"ɴᴏ", callback_data=f"myfile_{usr_cmd[1]}_{usr_cmd[2]}") | |
]])) | |
elif usr_cmd[0] == "msgdelyes": | |
await delete_user_file(usr_cmd[1], int(usr_cmd[2]), update) | |
return | |
elif usr_cmd[0] == "msgdelpvt": | |
await update.message.edit_caption( | |
caption="**Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ**\n\n", | |
reply_markup=InlineKeyboardMarkup([[ | |
InlineKeyboardButton("ʏᴇs", | |
callback_data=f"msgdelpvtyes_{usr_cmd[1]}"), | |
InlineKeyboardButton("ɴᴏ", | |
callback_data=f"mainstream_{usr_cmd[1]}") | |
]])) | |
elif usr_cmd[0] == "msgdelpvtyes": | |
await delete_user_filex(usr_cmd[1], update) | |
return | |
elif usr_cmd[0] == "mainstream": | |
_id = usr_cmd[1] | |
reply_markup, stream_text = await gen_link(_id=_id) | |
await update.message.edit_text( | |
text=stream_text, | |
parse_mode=ParseMode.HTML, | |
disable_web_page_preview=True, | |
reply_markup=reply_markup, | |
) | |
try: | |
user_id = str(usr_cmd[1]) | |
message_id = int(usr_cmd[2]) | |
response_id = int(usr_cmd[3]) | |
message = await FileStream.get_messages(user_id, message_id) | |
#response = await FileStream.get_messages(user_id, response_id) | |
instruction = { | |
"privacy_type": "PUBLIC", | |
"user_id": user_id, | |
"user_type": "TELEGRAM" | |
} | |
file_caption = getattr(message, "caption", f"{get_name(message)}" ) or "None/Unknown" | |
file_name = get_name(message) | |
print("Detail",file_caption,file_name) | |
await update.message.edit_text( | |
text=file_caption, | |
parse_mode=ParseMode.HTML, | |
disable_web_page_preview=True, | |
reply_markup=InlineKeyboardMarkup([ | |
[InlineKeyboardButton("Movie", callback_data=f"pubup_{user_id}_{message_id}_Movie")], | |
[InlineKeyboardButton("WebSeries", callback_data=f"pubup_{user_id}_{message_id}_WebSeries")], | |
[InlineKeyboardButton("Anime", callback_data=f"pubup_{user_id}_{message_id}_Anime")], | |
[InlineKeyboardButton("Documentary", callback_data=f"pubup_{user_id}_{message_id}_Documentary")], | |
[InlineKeyboardButton("Regular", callback_data=f"pubup_{user_id}_{message_id}_Regular")], | |
]), | |
) | |
"""#await FileStream.delete_messages(chat_id=user_id,message_ids=response_id) | |
await FileStream.send_poll( | |
type= enums.PollType.REGULAR, | |
reply_to_message_id=message_id, | |
explanation=f"{file_caption}", | |
question=f"{message_id}_{user_id}_{response_id}_File Name :\n{file_caption}", | |
options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], | |
allows_multiple_answers=True, | |
chat_id=user_id | |
) | |
""" | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
await FileStream.send_message( | |
chat_id=Telegram.ULOG_GROUP, | |
text=f"An error occurred: {str(e)}" | |
) | |
elif usr_cmd[0] == "pubup": | |
try: | |
user_id = str(usr_cmd[1]) | |
message_id = int(usr_cmd[2]) | |
msg_type = usr_cmd[3] | |
#response_id = int(usr_cmd[3]) | |
message = await FileStream.get_messages(user_id, message_id) | |
#response = await FileStream.get_messages(user_id, response_id) | |
#print("Response Update:",update,f"Message ID:{response_id}") | |
instruction = { | |
"privacy_type": "PUBLIC", | |
"user_id": user_id, | |
"user_type": "TELEGRAM" | |
} | |
file_caption = getattr(message, "caption", f"{get_name(message)}" ) or "None/Unknown" | |
file_name = get_name(message) | |
print("Detail",file_caption,"\nFile:",file_name) | |
""" | |
await FileStream.delete_messages( | |
chat_id=user_id, | |
message_ids=message_id | |
) | |
await FileStream.send_poll( | |
type= enums.PollType.REGULAR, | |
reply_to_message_id=message_id, | |
question="Is this a poll question?", | |
options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], | |
allows_multiple_answers=True, | |
chat_id=user_id | |
) | |
functions.messages.EditMessage( | |
peer=user_id, | |
id=message_id, | |
media=InputMediaPoll( | |
poll=Poll( | |
id=FileStream.rnd_id(), | |
question="Is this a poll question?", | |
answers=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], | |
multiple_choice=True, | |
quiz=False, | |
) | |
), | |
) | |
print("Poll function and all initializers executed successfully.") | |
print("dir(Poll):", dir(Poll)) | |
print("Poll.__init__:", Poll.__init__) | |
print("Poll.__init__ inputs:", Poll.__init__.__code__.co_varnames) | |
print("InputMediaPoll.__init__ inputs:", InputMediaPoll.__init__.__code__.co_varnames) | |
update=await FileStream.edit_message_media( | |
chat_id=user_id, | |
message_id=response_id, | |
media=InputMediaPoll( | |
caption="Is this a poll question?", | |
poll=Poll( | |
id=FileStream.rnd_id(), | |
question="Is this a poll question?", | |
answers=[ | |
PollAnswer(text="Movie", option=bytes([1])), | |
PollAnswer(text="WebSeries", option=bytes([2])), | |
PollAnswer(text="Anime", option=bytes([3])), | |
PollAnswer(text="Documentary", option=bytes([4])) | |
], | |
closed=False, | |
public_voters=True, | |
multiple_choice=True, | |
quiz=False, | |
close_period=None, | |
close_date=None | |
), | |
) | |
) | |
print("Update:",update) | |
PollAnswer( | |
id=FileStream.rnd_id(), | |
question="Is this a poll question?", | |
answers=["Movie","WebSeries"), | |
PollAnswer(text="Anime"), | |
PollAnswer(text="Documentary"), | |
PollAnswer(text="Regular") | |
], | |
multiple_choice=True, | |
quiz=False, | |
answers=[ | |
PollAnswer("Movie"), | |
PollAnswer(text="WebSeries"), | |
PollAnswer(text="Anime"), | |
PollAnswer(text="Documentary"), | |
PollAnswer(text="Regular") | |
], | |
await FileStream.send_poll( | |
type= enums.PollType.REGULAR, | |
reply_to_message_id=message_id, | |
question="Is this a poll question?", | |
options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], | |
allows_multiple_answers=True, | |
chat_id=user_id | |
) | |
#type=enums.PollType.QUIZ, | |
print(polls) | |
""" | |
file_info = get_file_info(message, instruction) | |
# Here we are Adding the File Into the database First | |
inserted_id = await db.add_file(file_info) | |
await get_file_ids(False, inserted_id, message) | |
#All the Time Get_file_ids should be called before update privacy or else tagged_users will be {} | |
await db.update_privacy(file_info) | |
reply_markup, stream_text = await gen_link(_id=inserted_id) | |
await update.message.edit_text( | |
text=stream_text, | |
parse_mode=ParseMode.HTML, | |
disable_web_page_preview=True, | |
reply_markup=reply_markup, | |
) | |
except FloodWait as e: | |
print(f"Sleeping for {str(e.value)}s") | |
await asyncio.sleep(e.value) | |
await FileStream.send_message( | |
chat_id=Telegram.ULOG_GROUP, | |
text= | |
f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`", | |
disable_web_page_preview=True, | |
parse_mode=ParseMode.MARKDOWN) | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
await FileStream.send_message( | |
chat_id=Telegram.ULOG_GROUP, | |
text=f"An error occurred: {str(e)}" | |
) | |
elif usr_cmd[0] == "privup": | |
try: | |
user_id = str(usr_cmd[1]) | |
message_id = int(usr_cmd[2]) | |
message = await FileStream.get_messages(user_id, message_id) | |
instruction = { | |
"privacy_type": "PRIVATE", | |
"user_id": user_id, | |
"user_type": "TELEGRAM" | |
} | |
file_info = get_file_info(message, instruction) | |
# Here we are Adding the File Into the database First | |
db_id = await db.add_file(file_info) | |
await get_file_ids(False, db_id, message) | |
if True: | |
file_info = await db.get_file(db_id) | |
reply_markup, stream_text = await priv_func(file_info['file']['file_name'], file_info['file']['file_size']) | |
await update.message.edit_text( | |
text=stream_text, | |
parse_mode=ParseMode.HTML, | |
disable_web_page_preview=True, | |
reply_markup=reply_markup, | |
) | |
except FloodWait as e: | |
print(f"Sleeping for {str(e.value)}s") | |
await asyncio.sleep(e.value) | |
await FileStream.send_message( | |
chat_id=Telegram.ULOG_GROUP, | |
text= | |
f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`", | |
disable_web_page_preview=True, | |
parse_mode=ParseMode.MARKDOWN) | |
elif usr_cmd[0] == "userfiles": | |
file_list, total_files = await gen_file_list_button(int(usr_cmd[1]), update.from_user.id) | |
await update.message.edit_caption( | |
caption="Total files: {}".format(total_files), | |
reply_markup=InlineKeyboardMarkup(file_list)) | |
elif usr_cmd[0] == "userprivfiles": | |
file_list, total_files = await gen_privfile_list_button( | |
int(usr_cmd[1]), update.from_user.id) | |
await update.message.edit_caption( | |
caption="Total files: {}".format(total_files), | |
reply_markup=InlineKeyboardMarkup(file_list)) | |
elif usr_cmd[0] == "userallfiles": | |
file_list, total_files = await gen_allfile_list_button( | |
int(usr_cmd[1]), update.from_user.id) | |
await update.message.edit_caption( | |
caption="Total files: {}".format(total_files), | |
reply_markup=InlineKeyboardMarkup(file_list)) | |
elif usr_cmd[0] == "myfile": | |
await gen_file_menu(usr_cmd[1], usr_cmd[2], update) | |
return | |
elif usr_cmd[0] == "allfile": | |
await gen_allfile_menu(usr_cmd[1], usr_cmd[2], update) | |
return | |
elif usr_cmd[0] == "myprivfile": | |
await gen_privfile_menu(usr_cmd[1], usr_cmd[2], update) | |
return | |
elif usr_cmd[0] == "sendfile": | |
myfile = await db.get_file(usr_cmd[1]) | |
file_name = myfile['file']['file_name'] | |
await update.answer(f"Sending File {file_name}") | |
await update.message.reply_cached_media(myfile['file']['file_id'],caption=f'**{file_name}**') | |
else: | |
await update.message.delete() | |
#---------------------[ MY FILES FUNC ]---------------------# | |
async def gen_file_list_button(file_list_no: int, user_id: int): | |
file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10] | |
user_files, total_files = await db.find_files(user_id, file_range) | |
file_list = [] | |
async for x in user_files: | |
file_list.append([ | |
InlineKeyboardButton(f"📦 {x['file']['caption']}", | |
callback_data=f"myfile_{x['_id']}_{file_list_no}") | |
]) | |
if total_files > 10: | |
file_list.append([ | |
InlineKeyboardButton( | |
"◄", | |
callback_data="{}".format("userfiles_" + | |
str(file_list_no - | |
1) if file_list_no > 1 else 'N/A')), | |
InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}", | |
callback_data="N/A"), | |
InlineKeyboardButton( | |
"►", | |
callback_data="{}".format("userfiles_" + | |
str(file_list_no + | |
1) if total_files > file_list_no * | |
10 else 'N/A')) | |
]) | |
if not file_list: | |
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")]) | |
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) | |
return file_list, total_files | |
async def gen_file_menu(_id, file_list_no, update: CallbackQuery): | |
try: | |
myfile_info = await db.get_file(_id) | |
except FileNotFound: | |
await update.answer("File Not Found") | |
return | |
file_id = FileId.decode(myfile_info['file']['file_id']) | |
if file_id.file_type in PHOTO_TYPES: | |
file_type = "Image" | |
elif file_id.file_type == FileType.VOICE: | |
file_type = "Voice" | |
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, | |
FileType.VIDEO_NOTE): | |
file_type = "Video" | |
elif file_id.file_type == FileType.DOCUMENT: | |
file_type = "Document" | |
elif file_id.file_type == FileType.STICKER: | |
file_type = "Sticker" | |
elif file_id.file_type == FileType.AUDIO: | |
file_type = "Audio" | |
else: | |
file_type = "Unknown" | |
page_link = f"{Server.URL}app/watch/{myfile_info['_id']}" | |
stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}" | |
if "video" in file_type.lower(): | |
MYFILES_BUTTONS = InlineKeyboardMarkup([ | |
[ | |
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), | |
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) | |
], | |
[ | |
InlineKeyboardButton( | |
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), | |
InlineKeyboardButton( | |
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", | |
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") | |
], | |
[ | |
InlineKeyboardButton( | |
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) | |
] | |
]) | |
else: | |
MYFILES_BUTTONS = InlineKeyboardMarkup([ | |
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], | |
[ | |
InlineKeyboardButton( | |
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), | |
InlineKeyboardButton( | |
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", | |
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") | |
], | |
[ | |
InlineKeyboardButton( | |
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) | |
] | |
]) | |
TiMe = myfile_info['time'] | |
if type(TiMe) == float: | |
date = datetime.datetime.fromtimestamp(TiMe) | |
await update.edit_message_caption( | |
caption="**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`" | |
.format(myfile_info['file']['file_name'], | |
humanbytes(int(myfile_info['file']['file_size'])), file_type, | |
TiMe if isinstance(TiMe, str) else date.date()), reply_markup=MYFILES_BUTTONS) | |
#################------ private file list | |
async def gen_privfile_list_button(file_list_no: int, user_id: int): | |
file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10] | |
user_files, total_files = await db.find_privfiles(user_id, file_range) | |
file_list = [] | |
async for x in user_files: | |
file_list.append([ | |
InlineKeyboardButton( | |
f"📦 {x['file']['caption']}", | |
callback_data=f"myprivfile_{x['_id']}_{file_list_no}") | |
]) | |
if total_files > 10: | |
file_list.append([ | |
InlineKeyboardButton( | |
"◄", | |
callback_data="{}".format("userprivfiles_" + | |
str(file_list_no - | |
1) if file_list_no > 1 else 'N/A')), | |
InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}", | |
callback_data="N/A"), | |
InlineKeyboardButton( | |
"►", | |
callback_data="{}".format("userprivfiles_" + | |
str(file_list_no + | |
1) if total_files > file_list_no * | |
10 else 'N/A')) | |
]) | |
if not file_list: | |
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")]) | |
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) | |
return file_list, total_files | |
async def gen_privfile_menu(_id, file_list_no, update: CallbackQuery): | |
try: | |
myfile_info = await db.get_privfile(_id) | |
except FileNotFound: | |
await update.answer("File Not Found") | |
return | |
file_id = FileId.decode(myfile_info['file']['file_id']) | |
if file_id.file_type in PHOTO_TYPES: | |
file_type = "Image" | |
elif file_id.file_type == FileType.VOICE: | |
file_type = "Voice" | |
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, | |
FileType.VIDEO_NOTE): | |
file_type = "Video" | |
elif file_id.file_type == FileType.DOCUMENT: | |
file_type = "Document" | |
elif file_id.file_type == FileType.STICKER: | |
file_type = "Sticker" | |
elif file_id.file_type == FileType.AUDIO: | |
file_type = "Audio" | |
else: | |
file_type = "Unknown" | |
page_link = f"{Server.URL}app/watch/{myfile_info['_id']}" | |
stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}" | |
if "video" in file_type.lower(): | |
MYFILES_BUTTONS = InlineKeyboardMarkup([ | |
[ | |
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), | |
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) | |
], | |
[ | |
InlineKeyboardButton( | |
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), | |
InlineKeyboardButton( | |
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", | |
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") | |
], | |
[ | |
InlineKeyboardButton( | |
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) | |
] | |
]) | |
else: | |
MYFILES_BUTTONS = InlineKeyboardMarkup([ | |
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], | |
[ | |
InlineKeyboardButton( | |
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), | |
InlineKeyboardButton( | |
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", | |
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") | |
], | |
[ | |
InlineKeyboardButton( | |
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) | |
] | |
]) | |
TiMe = myfile_info['time'] | |
if type(TiMe) == float: | |
date = datetime.datetime.fromtimestamp(TiMe) | |
await update.edit_message_caption( | |
caption= | |
"**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`" | |
.format(myfile_info['file_name'], | |
humanbytes(int(myfile_info['file']['file_size'])), file_type, | |
TiMe if isinstance(TiMe, str) else date.date()), | |
reply_markup=MYFILES_BUTTONS) | |
##################---- all file function-----################## | |
async def gen_allfile_list_button(file_list_no: int, user_id: int): | |
file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10] | |
user_files, total_files = await db.find_all_files(file_range) | |
file_list = [] | |
async for x in user_files: | |
file_list.append([ | |
InlineKeyboardButton( | |
f"📦 {x['file']['caption']}", | |
callback_data=f"allfile_{x['_id']}_{file_list_no}") | |
]) | |
if total_files > 10: | |
file_list.append([ | |
InlineKeyboardButton( | |
"◄", | |
callback_data="{}".format("userallfiles_" + | |
str(file_list_no - | |
1) if file_list_no > 1 else 'N/A')), | |
InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}", | |
callback_data="N/A"), | |
InlineKeyboardButton( | |
"►", | |
callback_data="{}".format("userallfiles_" + | |
str(file_list_no + | |
1) if total_files > file_list_no * | |
10 else 'N/A')) | |
]) | |
if not file_list: | |
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")]) | |
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) | |
return file_list, total_files | |
async def gen_allfile_menu(_id, file_list_no, update: CallbackQuery): | |
try: | |
myfile_info = await db.get_file(_id) | |
except FileNotFound: | |
await update.answer("File Not Found") | |
return | |
file_id = FileId.decode(myfile_info['file']['file_id']) | |
if file_id.file_type in PHOTO_TYPES: | |
file_type = "Image" | |
elif file_id.file_type == FileType.VOICE: | |
file_type = "Voice" | |
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, | |
FileType.VIDEO_NOTE): | |
file_type = "Video" | |
elif file_id.file_type == FileType.DOCUMENT: | |
file_type = "Document" | |
elif file_id.file_type == FileType.STICKER: | |
file_type = "Sticker" | |
elif file_id.file_type == FileType.AUDIO: | |
file_type = "Audio" | |
else: | |
file_type = "Unknown" | |
page_link = f"{Server.URL}app/watch/{myfile_info['_id']}" | |
stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}" | |
if "video" in file_type.lower(): | |
MYFILES_BUTTONS = InlineKeyboardMarkup([ | |
[ | |
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), | |
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) | |
], | |
[ | |
InlineKeyboardButton( | |
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), | |
InlineKeyboardButton( | |
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", | |
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") | |
], | |
[ | |
InlineKeyboardButton( | |
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) | |
] | |
]) | |
else: | |
MYFILES_BUTTONS = InlineKeyboardMarkup([ | |
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], | |
[ | |
InlineKeyboardButton( | |
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), | |
InlineKeyboardButton( | |
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", | |
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") | |
], | |
[ | |
InlineKeyboardButton( | |
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) | |
] | |
]) | |
TiMe = myfile_info['time'] | |
if type(TiMe) == float: | |
date = datetime.datetime.fromtimestamp(TiMe) | |
await update.edit_message_caption( | |
caption= | |
"**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`" | |
.format(myfile_info['file']['file_name'], | |
humanbytes(int(myfile_info['file']['file_size'])), file_type, | |
TiMe if isinstance(TiMe, str) else date.date()), | |
reply_markup=MYFILES_BUTTONS) | |
async def delete_user_file(_id, file_list_no: int, update: CallbackQuery): | |
try: | |
myfile_info = await db.get_file(_id) | |
if not myfile_info: | |
myfile_info = await db.get_privfile(db_id) | |
await db.delete_one_privfile(myfile_info['_id']) | |
#await db.count_links(update.from_user.id, "-") | |
await update.message.edit_caption( | |
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**" + | |
update.message.caption.replace("Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ", | |
""), | |
reply_markup=InlineKeyboardMarkup( | |
[[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data=f"userfiles_1")]])) | |
else: | |
await db.delete_one_file(myfile_info['_id']) | |
await db.count_links(update.from_user.id, "-") | |
await update.message.edit_caption( | |
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**" + | |
update.message.caption.replace("Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ", | |
""), | |
reply_markup=InlineKeyboardMarkup( | |
[[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data=f"userfiles_1")]])) | |
except FileNotFound: | |
await update.answer("File Already Deleted") | |
return | |
async def delete_user_filex(_id, update: CallbackQuery): | |
try: | |
myfile_info = await db.get_file(_id) | |
if not myfile_info: | |
myfile_info = await db.get_privfile(_id) | |
await db.delete_one_privfile(myfile_info['_id']) | |
await update.message.edit_caption( | |
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**\n\n", | |
reply_markup=InlineKeyboardMarkup( | |
[[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data=f"close")]])) | |
else: | |
await db.delete_one_file(myfile_info['_id']) | |
await db.count_links(update.from_user.id, "-") | |
await update.message.edit_caption( | |
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**\n\n", | |
reply_markup=InlineKeyboardMarkup( | |
[[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data=f"close")]])) | |
except FileNotFound: | |
await update.answer("File Already Deleted") | |
return | |