import math import asyncio import datetime from pyrogram import filters, Client, raw, types,enums from pyrogram.errors import FloodWait from pyrogram.raw import functions from pyrogram.raw.types import Poll,PollAnswer, InputMediaPoll #from pyrogram.raw.base import PollAnswer, Poll from pyrogram.enums.parse_mode import ParseMode from pyrogram.file_id import FileId, FileType, PHOTO_TYPES from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery, WebAppInfo from pyrogram.raw.types import KeyboardButtonSimpleWebView #-------------------------Local Imports -----------------------------------# from FileStream import __version__ from FileStream.Database import Database from FileStream.config import Telegram, Server from FileStream.bot import FileStream from FileStream.bot import MULTI_CLIENTS from FileStream.Exceptions import FileNotFound # Corrected the import statement from FileStream.utils.FileProcessors.translation import LANG, BUTTON from FileStream.utils.FileProcessors.human_readable import humanbytes from FileStream.bot.plugins.FileHandlers.stream import private_receive_handler from FileStream.utils.FileProcessors.file_properties import get_file_ids, get_file_info,get_name from FileStream.utils.FileProcessors.bot_utils import gen_link, priv_func, gen_priv_file_link #-----------------Starting Point --------------------------# db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) #---------------------[ START CMD ]---------------------# @FileStream.on_callback_query() async def cb_data(bot: Client, update: CallbackQuery): usr_cmd = update.data.split("_") if usr_cmd[0] == "home": await update.message.edit_text(text=LANG.START_TEXT.format( update.from_user.mention, FileStream.username), disable_web_page_preview=True, reply_markup=BUTTON.START_BUTTONS) elif usr_cmd[0] == "help": await update.message.edit_text(text=LANG.HELP_TEXT.format( Telegram.OWNER_ID), disable_web_page_preview=True, reply_markup=BUTTON.HELP_BUTTONS) elif usr_cmd[0] == "about": await update.message.edit_text(text=LANG.ABOUT_TEXT.format( FileStream.fname, __version__), disable_web_page_preview=True, reply_markup=BUTTON.ABOUT_BUTTONS) #---------------------[ MY FILES CMD ]---------------------# elif usr_cmd[0] == "N/A": await update.answer("N/A", True) elif usr_cmd[0] == "close": await update.message.delete() elif usr_cmd[0] == "back": try: user_id = str(usr_cmd[1]) message_id = int(usr_cmd[2]) print(user_id, message_id) message = await FileStream.get_messages(user_id, message_id) await private_receive_handler(FileStream, message) except FloodWait as e: print(f"Sleeping for {str(e.value)}s") await asyncio.sleep(e.value) await FileStream.send_message( chat_id=Telegram.ULOG_GROUP, text= f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`", disable_web_page_preview=True, parse_mode=ParseMode.MARKDOWN) elif usr_cmd[0] == "msgdelete": await update.message.edit_caption( caption="**Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ**\n\n", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton( "ʏᴇs", callback_data=f"msgdelyes_{usr_cmd[1]}_{usr_cmd[2]}"), InlineKeyboardButton( "ɴᴏ", callback_data=f"myfile_{usr_cmd[1]}_{usr_cmd[2]}") ]])) elif usr_cmd[0] == "msgdelyes": await delete_user_file(usr_cmd[1], int(usr_cmd[2]), update) return elif usr_cmd[0] == "msgdelpvt": await update.message.edit_caption( caption="**Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ**\n\n", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("ʏᴇs", callback_data=f"msgdelpvtyes_{usr_cmd[1]}"), InlineKeyboardButton("ɴᴏ", callback_data=f"mainstream_{usr_cmd[1]}") ]])) elif usr_cmd[0] == "msgdelpvtyes": await delete_user_filex(usr_cmd[1], update) return elif usr_cmd[0] == "mainstream": _id = usr_cmd[1] reply_markup, stream_text = await gen_link(_id=_id) await update.message.edit_text( text=stream_text, parse_mode=ParseMode.HTML, disable_web_page_preview=True, reply_markup=reply_markup, ) try: user_id = str(usr_cmd[1]) message_id = int(usr_cmd[2]) response_id = int(usr_cmd[3]) message = await FileStream.get_messages(user_id, message_id) #response = await FileStream.get_messages(user_id, response_id) instruction = { "privacy_type": "PUBLIC", "user_id": user_id, "user_type": "TELEGRAM" } file_caption = getattr(message, "caption", f"{get_name(message)}" ) or "None/Unknown" file_name = get_name(message) print("Detail",file_caption,file_name) await update.message.edit_text( text=file_caption, parse_mode=ParseMode.HTML, disable_web_page_preview=True, reply_markup=InlineKeyboardMarkup([ [InlineKeyboardButton("Movie", callback_data=f"pubup_{user_id}_{message_id}_Movie")], [InlineKeyboardButton("WebSeries", callback_data=f"pubup_{user_id}_{message_id}_WebSeries")], [InlineKeyboardButton("Anime", callback_data=f"pubup_{user_id}_{message_id}_Anime")], [InlineKeyboardButton("Documentary", callback_data=f"pubup_{user_id}_{message_id}_Documentary")], [InlineKeyboardButton("Regular", callback_data=f"pubup_{user_id}_{message_id}_Regular")], ]), ) """#await FileStream.delete_messages(chat_id=user_id,message_ids=response_id) await FileStream.send_poll( type= enums.PollType.REGULAR, reply_to_message_id=message_id, explanation=f"{file_caption}", question=f"{message_id}_{user_id}_{response_id}_File Name :\n{file_caption}", options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], allows_multiple_answers=True, chat_id=user_id ) """ except Exception as e: print(f"An error occurred: {str(e)}") await FileStream.send_message( chat_id=Telegram.ULOG_GROUP, text=f"An error occurred: {str(e)}" ) elif usr_cmd[0] == "pubup": try: user_id = str(usr_cmd[1]) message_id = int(usr_cmd[2]) msg_type = usr_cmd[3] #response_id = int(usr_cmd[3]) message = await FileStream.get_messages(user_id, message_id) #response = await FileStream.get_messages(user_id, response_id) #print("Response Update:",update,f"Message ID:{response_id}") instruction = { "privacy_type": "PUBLIC", "user_id": user_id, "user_type": "TELEGRAM" } file_caption = getattr(message, "caption", f"{get_name(message)}" ) or "None/Unknown" file_name = get_name(message) print("Detail",file_caption,"\nFile:",file_name) """ await FileStream.delete_messages( chat_id=user_id, message_ids=message_id ) await FileStream.send_poll( type= enums.PollType.REGULAR, reply_to_message_id=message_id, question="Is this a poll question?", options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], allows_multiple_answers=True, chat_id=user_id ) functions.messages.EditMessage( peer=user_id, id=message_id, media=InputMediaPoll( poll=Poll( id=FileStream.rnd_id(), question="Is this a poll question?", answers=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], multiple_choice=True, quiz=False, ) ), ) print("Poll function and all initializers executed successfully.") print("dir(Poll):", dir(Poll)) print("Poll.__init__:", Poll.__init__) print("Poll.__init__ inputs:", Poll.__init__.__code__.co_varnames) print("InputMediaPoll.__init__ inputs:", InputMediaPoll.__init__.__code__.co_varnames) update=await FileStream.edit_message_media( chat_id=user_id, message_id=response_id, media=InputMediaPoll( caption="Is this a poll question?", poll=Poll( id=FileStream.rnd_id(), question="Is this a poll question?", answers=[ PollAnswer(text="Movie", option=bytes([1])), PollAnswer(text="WebSeries", option=bytes([2])), PollAnswer(text="Anime", option=bytes([3])), PollAnswer(text="Documentary", option=bytes([4])) ], closed=False, public_voters=True, multiple_choice=True, quiz=False, close_period=None, close_date=None ), ) ) print("Update:",update) PollAnswer( id=FileStream.rnd_id(), question="Is this a poll question?", answers=["Movie","WebSeries"), PollAnswer(text="Anime"), PollAnswer(text="Documentary"), PollAnswer(text="Regular") ], multiple_choice=True, quiz=False, answers=[ PollAnswer("Movie"), PollAnswer(text="WebSeries"), PollAnswer(text="Anime"), PollAnswer(text="Documentary"), PollAnswer(text="Regular") ], await FileStream.send_poll( type= enums.PollType.REGULAR, reply_to_message_id=message_id, question="Is this a poll question?", options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], allows_multiple_answers=True, chat_id=user_id ) #type=enums.PollType.QUIZ, print(polls) """ file_info = get_file_info(message, instruction) # Here we are Adding the File Into the database First inserted_id = await db.add_file(file_info) await get_file_ids(False, inserted_id, message) #All the Time Get_file_ids should be called before update privacy or else tagged_users will be {} await db.update_privacy(file_info) reply_markup, stream_text = await gen_link(_id=inserted_id) await update.message.edit_text( text=stream_text, parse_mode=ParseMode.HTML, disable_web_page_preview=True, reply_markup=reply_markup, ) except FloodWait as e: print(f"Sleeping for {str(e.value)}s") await asyncio.sleep(e.value) await FileStream.send_message( chat_id=Telegram.ULOG_GROUP, text= f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`", disable_web_page_preview=True, parse_mode=ParseMode.MARKDOWN) except Exception as e: print(f"An error occurred: {str(e)}") await FileStream.send_message( chat_id=Telegram.ULOG_GROUP, text=f"An error occurred: {str(e)}" ) elif usr_cmd[0] == "privup": try: user_id = str(usr_cmd[1]) message_id = int(usr_cmd[2]) message = await FileStream.get_messages(user_id, message_id) instruction = { "privacy_type": "PRIVATE", "user_id": user_id, "user_type": "TELEGRAM" } file_info = get_file_info(message, instruction) # Here we are Adding the File Into the database First db_id = await db.add_file(file_info) await get_file_ids(False, db_id, message) if True: file_info = await db.get_file(db_id) reply_markup, stream_text = await priv_func(file_info['file']['file_name'], file_info['file']['file_size']) await update.message.edit_text( text=stream_text, parse_mode=ParseMode.HTML, disable_web_page_preview=True, reply_markup=reply_markup, ) except FloodWait as e: print(f"Sleeping for {str(e.value)}s") await asyncio.sleep(e.value) await FileStream.send_message( chat_id=Telegram.ULOG_GROUP, text= f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`", disable_web_page_preview=True, parse_mode=ParseMode.MARKDOWN) elif usr_cmd[0] == "userfiles": file_list, total_files = await gen_file_list_button(int(usr_cmd[1]), update.from_user.id) await update.message.edit_caption( caption="Total files: {}".format(total_files), reply_markup=InlineKeyboardMarkup(file_list)) elif usr_cmd[0] == "userprivfiles": file_list, total_files = await gen_privfile_list_button( int(usr_cmd[1]), update.from_user.id) await update.message.edit_caption( caption="Total files: {}".format(total_files), reply_markup=InlineKeyboardMarkup(file_list)) elif usr_cmd[0] == "userallfiles": file_list, total_files = await gen_allfile_list_button( int(usr_cmd[1]), update.from_user.id) await update.message.edit_caption( caption="Total files: {}".format(total_files), reply_markup=InlineKeyboardMarkup(file_list)) elif usr_cmd[0] == "myfile": await gen_file_menu(usr_cmd[1], usr_cmd[2], update) return elif usr_cmd[0] == "allfile": await gen_allfile_menu(usr_cmd[1], usr_cmd[2], update) return elif usr_cmd[0] == "myprivfile": await gen_privfile_menu(usr_cmd[1], usr_cmd[2], update) return elif usr_cmd[0] == "sendfile": myfile = await db.get_file(usr_cmd[1]) file_name = myfile['file']['file_name'] await update.answer(f"Sending File {file_name}") await update.message.reply_cached_media(myfile['file']['file_id'],caption=f'**{file_name}**') else: await update.message.delete() #---------------------[ MY FILES FUNC ]---------------------# async def gen_file_list_button(file_list_no: int, user_id: int): file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10] user_files, total_files = await db.find_files(user_id, file_range) file_list = [] async for x in user_files: file_list.append([ InlineKeyboardButton(f"📦 {x['file']['caption']}", callback_data=f"myfile_{x['_id']}_{file_list_no}") ]) if total_files > 10: file_list.append([ InlineKeyboardButton( "◄", callback_data="{}".format("userfiles_" + str(file_list_no - 1) if file_list_no > 1 else 'N/A')), InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}", callback_data="N/A"), InlineKeyboardButton( "►", callback_data="{}".format("userfiles_" + str(file_list_no + 1) if total_files > file_list_no * 10 else 'N/A')) ]) if not file_list: file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")]) file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) return file_list, total_files async def gen_file_menu(_id, file_list_no, update: CallbackQuery): try: myfile_info = await db.get_file(_id) except FileNotFound: await update.answer("File Not Found") return file_id = FileId.decode(myfile_info['file']['file_id']) if file_id.file_type in PHOTO_TYPES: file_type = "Image" elif file_id.file_type == FileType.VOICE: file_type = "Voice" elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, FileType.VIDEO_NOTE): file_type = "Video" elif file_id.file_type == FileType.DOCUMENT: file_type = "Document" elif file_id.file_type == FileType.STICKER: file_type = "Sticker" elif file_id.file_type == FileType.AUDIO: file_type = "Audio" else: file_type = "Unknown" page_link = f"{Server.URL}app/watch/{myfile_info['_id']}" stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}" if "video" in file_type.lower(): MYFILES_BUTTONS = InlineKeyboardMarkup([ [ InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) ], [ InlineKeyboardButton( "ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), InlineKeyboardButton( "ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") ], [ InlineKeyboardButton( "ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) ] ]) else: MYFILES_BUTTONS = InlineKeyboardMarkup([ [InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], [ InlineKeyboardButton( "ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), InlineKeyboardButton( "ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") ], [ InlineKeyboardButton( "ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) ] ]) TiMe = myfile_info['time'] if type(TiMe) == float: date = datetime.datetime.fromtimestamp(TiMe) await update.edit_message_caption( caption="**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`" .format(myfile_info['file']['file_name'], humanbytes(int(myfile_info['file']['file_size'])), file_type, TiMe if isinstance(TiMe, str) else date.date()), reply_markup=MYFILES_BUTTONS) #################------ private file list async def gen_privfile_list_button(file_list_no: int, user_id: int): file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10] user_files, total_files = await db.find_privfiles(user_id, file_range) file_list = [] async for x in user_files: file_list.append([ InlineKeyboardButton( f"📦 {x['file']['caption']}", callback_data=f"myprivfile_{x['_id']}_{file_list_no}") ]) if total_files > 10: file_list.append([ InlineKeyboardButton( "◄", callback_data="{}".format("userprivfiles_" + str(file_list_no - 1) if file_list_no > 1 else 'N/A')), InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}", callback_data="N/A"), InlineKeyboardButton( "►", callback_data="{}".format("userprivfiles_" + str(file_list_no + 1) if total_files > file_list_no * 10 else 'N/A')) ]) if not file_list: file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")]) file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) return file_list, total_files async def gen_privfile_menu(_id, file_list_no, update: CallbackQuery): try: myfile_info = await db.get_privfile(_id) except FileNotFound: await update.answer("File Not Found") return file_id = FileId.decode(myfile_info['file']['file_id']) if file_id.file_type in PHOTO_TYPES: file_type = "Image" elif file_id.file_type == FileType.VOICE: file_type = "Voice" elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, FileType.VIDEO_NOTE): file_type = "Video" elif file_id.file_type == FileType.DOCUMENT: file_type = "Document" elif file_id.file_type == FileType.STICKER: file_type = "Sticker" elif file_id.file_type == FileType.AUDIO: file_type = "Audio" else: file_type = "Unknown" page_link = f"{Server.URL}app/watch/{myfile_info['_id']}" stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}" if "video" in file_type.lower(): MYFILES_BUTTONS = InlineKeyboardMarkup([ [ InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) ], [ InlineKeyboardButton( "ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), InlineKeyboardButton( "ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") ], [ InlineKeyboardButton( "ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) ] ]) else: MYFILES_BUTTONS = InlineKeyboardMarkup([ [InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], [ InlineKeyboardButton( "ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), InlineKeyboardButton( "ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") ], [ InlineKeyboardButton( "ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) ] ]) TiMe = myfile_info['time'] if type(TiMe) == float: date = datetime.datetime.fromtimestamp(TiMe) await update.edit_message_caption( caption= "**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`" .format(myfile_info['file_name'], humanbytes(int(myfile_info['file']['file_size'])), file_type, TiMe if isinstance(TiMe, str) else date.date()), reply_markup=MYFILES_BUTTONS) ##################---- all file function-----################## async def gen_allfile_list_button(file_list_no: int, user_id: int): file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10] user_files, total_files = await db.find_all_files(file_range) file_list = [] async for x in user_files: file_list.append([ InlineKeyboardButton( f"📦 {x['file']['caption']}", callback_data=f"allfile_{x['_id']}_{file_list_no}") ]) if total_files > 10: file_list.append([ InlineKeyboardButton( "◄", callback_data="{}".format("userallfiles_" + str(file_list_no - 1) if file_list_no > 1 else 'N/A')), InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}", callback_data="N/A"), InlineKeyboardButton( "►", callback_data="{}".format("userallfiles_" + str(file_list_no + 1) if total_files > file_list_no * 10 else 'N/A')) ]) if not file_list: file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")]) file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) return file_list, total_files async def gen_allfile_menu(_id, file_list_no, update: CallbackQuery): try: myfile_info = await db.get_file(_id) except FileNotFound: await update.answer("File Not Found") return file_id = FileId.decode(myfile_info['file']['file_id']) if file_id.file_type in PHOTO_TYPES: file_type = "Image" elif file_id.file_type == FileType.VOICE: file_type = "Voice" elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, FileType.VIDEO_NOTE): file_type = "Video" elif file_id.file_type == FileType.DOCUMENT: file_type = "Document" elif file_id.file_type == FileType.STICKER: file_type = "Sticker" elif file_id.file_type == FileType.AUDIO: file_type = "Audio" else: file_type = "Unknown" page_link = f"{Server.URL}app/watch/{myfile_info['_id']}" stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}" if "video" in file_type.lower(): MYFILES_BUTTONS = InlineKeyboardMarkup([ [ InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) ], [ InlineKeyboardButton( "ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), InlineKeyboardButton( "ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") ], [ InlineKeyboardButton( "ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) ] ]) else: MYFILES_BUTTONS = InlineKeyboardMarkup([ [InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], [ InlineKeyboardButton( "ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), InlineKeyboardButton( "ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") ], [ InlineKeyboardButton( "ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) ] ]) TiMe = myfile_info['time'] if type(TiMe) == float: date = datetime.datetime.fromtimestamp(TiMe) await update.edit_message_caption( caption= "**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`" .format(myfile_info['file']['file_name'], humanbytes(int(myfile_info['file']['file_size'])), file_type, TiMe if isinstance(TiMe, str) else date.date()), reply_markup=MYFILES_BUTTONS) async def delete_user_file(_id, file_list_no: int, update: CallbackQuery): try: myfile_info = await db.get_file(_id) if not myfile_info: myfile_info = await db.get_privfile(db_id) await db.delete_one_privfile(myfile_info['_id']) #await db.count_links(update.from_user.id, "-") await update.message.edit_caption( caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**" + update.message.caption.replace("Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ", ""), reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data=f"userfiles_1")]])) else: await db.delete_one_file(myfile_info['_id']) await db.count_links(update.from_user.id, "-") await update.message.edit_caption( caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**" + update.message.caption.replace("Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ", ""), reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data=f"userfiles_1")]])) except FileNotFound: await update.answer("File Already Deleted") return async def delete_user_filex(_id, update: CallbackQuery): try: myfile_info = await db.get_file(_id) if not myfile_info: myfile_info = await db.get_privfile(_id) await db.delete_one_privfile(myfile_info['_id']) await update.message.edit_caption( caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**\n\n", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data=f"close")]])) else: await db.delete_one_file(myfile_info['_id']) await db.count_links(update.from_user.id, "-") await update.message.edit_caption( caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**\n\n", reply_markup=InlineKeyboardMarkup( [[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data=f"close")]])) except FileNotFound: await update.answer("File Already Deleted") return