fsb / Unused Codes /callback.py
BinaryONe
Polls Update
a9f0bb6
import math
import asyncio
import datetime
from pyrogram import filters, Client, raw, types,enums
from pyrogram.errors import FloodWait
from pyrogram.raw import functions
from pyrogram.raw.types import Poll,PollAnswer, InputMediaPoll
#from pyrogram.raw.base import PollAnswer, Poll
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.file_id import FileId, FileType, PHOTO_TYPES
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery, WebAppInfo
from pyrogram.raw.types import KeyboardButtonSimpleWebView
#-------------------------Local Imports -----------------------------------#
from FileStream import __version__
from FileStream.Database import Database
from FileStream.config import Telegram, Server
from FileStream.bot import FileStream
from FileStream.bot import MULTI_CLIENTS
from FileStream.Exceptions import FileNotFound # Corrected the import statement
from FileStream.utils.FileProcessors.translation import LANG, BUTTON
from FileStream.utils.FileProcessors.human_readable import humanbytes
from FileStream.bot.plugins.FileHandlers.stream import private_receive_handler
from FileStream.utils.FileProcessors.file_properties import get_file_ids, get_file_info,get_name
from FileStream.utils.FileProcessors.bot_utils import gen_link, priv_func, gen_priv_file_link
#-----------------Starting Point --------------------------#
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
#---------------------[ START CMD ]---------------------#
@FileStream.on_callback_query()
async def cb_data(bot: Client, update: CallbackQuery):
usr_cmd = update.data.split("_")
if usr_cmd[0] == "home":
await update.message.edit_text(text=LANG.START_TEXT.format(
update.from_user.mention, FileStream.username),
disable_web_page_preview=True,
reply_markup=BUTTON.START_BUTTONS)
elif usr_cmd[0] == "help":
await update.message.edit_text(text=LANG.HELP_TEXT.format(
Telegram.OWNER_ID),
disable_web_page_preview=True,
reply_markup=BUTTON.HELP_BUTTONS)
elif usr_cmd[0] == "about":
await update.message.edit_text(text=LANG.ABOUT_TEXT.format(
FileStream.fname, __version__),
disable_web_page_preview=True,
reply_markup=BUTTON.ABOUT_BUTTONS)
#---------------------[ MY FILES CMD ]---------------------#
elif usr_cmd[0] == "N/A":
await update.answer("N/A", True)
elif usr_cmd[0] == "close":
await update.message.delete()
elif usr_cmd[0] == "back":
try:
user_id = str(usr_cmd[1])
message_id = int(usr_cmd[2])
print(user_id, message_id)
message = await FileStream.get_messages(user_id, message_id)
await private_receive_handler(FileStream, message)
except FloodWait as e:
print(f"Sleeping for {str(e.value)}s")
await asyncio.sleep(e.value)
await FileStream.send_message(
chat_id=Telegram.ULOG_GROUP,
text=
f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN)
elif usr_cmd[0] == "msgdelete":
await update.message.edit_caption(
caption="**Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ**\n\n",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton(
"ʏᴇs", callback_data=f"msgdelyes_{usr_cmd[1]}_{usr_cmd[2]}"),
InlineKeyboardButton(
"ɴᴏ", callback_data=f"myfile_{usr_cmd[1]}_{usr_cmd[2]}")
]]))
elif usr_cmd[0] == "msgdelyes":
await delete_user_file(usr_cmd[1], int(usr_cmd[2]), update)
return
elif usr_cmd[0] == "msgdelpvt":
await update.message.edit_caption(
caption="**Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ**\n\n",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("ʏᴇs",
callback_data=f"msgdelpvtyes_{usr_cmd[1]}"),
InlineKeyboardButton("ɴᴏ",
callback_data=f"mainstream_{usr_cmd[1]}")
]]))
elif usr_cmd[0] == "msgdelpvtyes":
await delete_user_filex(usr_cmd[1], update)
return
elif usr_cmd[0] == "mainstream":
_id = usr_cmd[1]
reply_markup, stream_text = await gen_link(_id=_id)
await update.message.edit_text(
text=stream_text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_markup=reply_markup,
)
try:
user_id = str(usr_cmd[1])
message_id = int(usr_cmd[2])
response_id = int(usr_cmd[3])
message = await FileStream.get_messages(user_id, message_id)
#response = await FileStream.get_messages(user_id, response_id)
instruction = {
"privacy_type": "PUBLIC",
"user_id": user_id,
"user_type": "TELEGRAM"
}
file_caption = getattr(message, "caption", f"{get_name(message)}" ) or "None/Unknown"
file_name = get_name(message)
print("Detail",file_caption,file_name)
await update.message.edit_text(
text=file_caption,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup([
[InlineKeyboardButton("Movie", callback_data=f"pubup_{user_id}_{message_id}_Movie")],
[InlineKeyboardButton("WebSeries", callback_data=f"pubup_{user_id}_{message_id}_WebSeries")],
[InlineKeyboardButton("Anime", callback_data=f"pubup_{user_id}_{message_id}_Anime")],
[InlineKeyboardButton("Documentary", callback_data=f"pubup_{user_id}_{message_id}_Documentary")],
[InlineKeyboardButton("Regular", callback_data=f"pubup_{user_id}_{message_id}_Regular")],
]),
)
"""#await FileStream.delete_messages(chat_id=user_id,message_ids=response_id)
await FileStream.send_poll(
type= enums.PollType.REGULAR,
reply_to_message_id=message_id,
explanation=f"{file_caption}",
question=f"{message_id}_{user_id}_{response_id}_File Name :\n{file_caption}",
options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"],
allows_multiple_answers=True,
chat_id=user_id
)
"""
except Exception as e:
print(f"An error occurred: {str(e)}")
await FileStream.send_message(
chat_id=Telegram.ULOG_GROUP,
text=f"An error occurred: {str(e)}"
)
elif usr_cmd[0] == "pubup":
try:
user_id = str(usr_cmd[1])
message_id = int(usr_cmd[2])
msg_type = usr_cmd[3]
#response_id = int(usr_cmd[3])
message = await FileStream.get_messages(user_id, message_id)
#response = await FileStream.get_messages(user_id, response_id)
#print("Response Update:",update,f"Message ID:{response_id}")
instruction = {
"privacy_type": "PUBLIC",
"user_id": user_id,
"user_type": "TELEGRAM"
}
file_caption = getattr(message, "caption", f"{get_name(message)}" ) or "None/Unknown"
file_name = get_name(message)
print("Detail",file_caption,"\nFile:",file_name)
"""
await FileStream.delete_messages(
chat_id=user_id,
message_ids=message_id
)
await FileStream.send_poll(
type= enums.PollType.REGULAR,
reply_to_message_id=message_id,
question="Is this a poll question?",
options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"],
allows_multiple_answers=True,
chat_id=user_id
)
functions.messages.EditMessage(
peer=user_id,
id=message_id,
media=InputMediaPoll(
poll=Poll(
id=FileStream.rnd_id(),
question="Is this a poll question?",
answers=["Movie", "WebSeries", "Anime", "Documentary", "Regular"],
multiple_choice=True,
quiz=False,
)
),
)
print("Poll function and all initializers executed successfully.")
print("dir(Poll):", dir(Poll))
print("Poll.__init__:", Poll.__init__)
print("Poll.__init__ inputs:", Poll.__init__.__code__.co_varnames)
print("InputMediaPoll.__init__ inputs:", InputMediaPoll.__init__.__code__.co_varnames)
update=await FileStream.edit_message_media(
chat_id=user_id,
message_id=response_id,
media=InputMediaPoll(
caption="Is this a poll question?",
poll=Poll(
id=FileStream.rnd_id(),
question="Is this a poll question?",
answers=[
PollAnswer(text="Movie", option=bytes([1])),
PollAnswer(text="WebSeries", option=bytes([2])),
PollAnswer(text="Anime", option=bytes([3])),
PollAnswer(text="Documentary", option=bytes([4]))
],
closed=False,
public_voters=True,
multiple_choice=True,
quiz=False,
close_period=None,
close_date=None
),
)
)
print("Update:",update)
PollAnswer(
id=FileStream.rnd_id(),
question="Is this a poll question?",
answers=["Movie","WebSeries"),
PollAnswer(text="Anime"),
PollAnswer(text="Documentary"),
PollAnswer(text="Regular")
],
multiple_choice=True,
quiz=False,
answers=[
PollAnswer("Movie"),
PollAnswer(text="WebSeries"),
PollAnswer(text="Anime"),
PollAnswer(text="Documentary"),
PollAnswer(text="Regular")
],
await FileStream.send_poll(
type= enums.PollType.REGULAR,
reply_to_message_id=message_id,
question="Is this a poll question?",
options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"],
allows_multiple_answers=True,
chat_id=user_id
)
#type=enums.PollType.QUIZ,
print(polls)
"""
file_info = get_file_info(message, instruction)
# Here we are Adding the File Into the database First
inserted_id = await db.add_file(file_info)
await get_file_ids(False, inserted_id, message)
#All the Time Get_file_ids should be called before update privacy or else tagged_users will be {}
await db.update_privacy(file_info)
reply_markup, stream_text = await gen_link(_id=inserted_id)
await update.message.edit_text(
text=stream_text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_markup=reply_markup,
)
except FloodWait as e:
print(f"Sleeping for {str(e.value)}s")
await asyncio.sleep(e.value)
await FileStream.send_message(
chat_id=Telegram.ULOG_GROUP,
text=
f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN)
except Exception as e:
print(f"An error occurred: {str(e)}")
await FileStream.send_message(
chat_id=Telegram.ULOG_GROUP,
text=f"An error occurred: {str(e)}"
)
elif usr_cmd[0] == "privup":
try:
user_id = str(usr_cmd[1])
message_id = int(usr_cmd[2])
message = await FileStream.get_messages(user_id, message_id)
instruction = {
"privacy_type": "PRIVATE",
"user_id": user_id,
"user_type": "TELEGRAM"
}
file_info = get_file_info(message, instruction)
# Here we are Adding the File Into the database First
db_id = await db.add_file(file_info)
await get_file_ids(False, db_id, message)
if True:
file_info = await db.get_file(db_id)
reply_markup, stream_text = await priv_func(file_info['file']['file_name'], file_info['file']['file_size'])
await update.message.edit_text(
text=stream_text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_markup=reply_markup,
)
except FloodWait as e:
print(f"Sleeping for {str(e.value)}s")
await asyncio.sleep(e.value)
await FileStream.send_message(
chat_id=Telegram.ULOG_GROUP,
text=
f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN)
elif usr_cmd[0] == "userfiles":
file_list, total_files = await gen_file_list_button(int(usr_cmd[1]), update.from_user.id)
await update.message.edit_caption(
caption="Total files: {}".format(total_files),
reply_markup=InlineKeyboardMarkup(file_list))
elif usr_cmd[0] == "userprivfiles":
file_list, total_files = await gen_privfile_list_button(
int(usr_cmd[1]), update.from_user.id)
await update.message.edit_caption(
caption="Total files: {}".format(total_files),
reply_markup=InlineKeyboardMarkup(file_list))
elif usr_cmd[0] == "userallfiles":
file_list, total_files = await gen_allfile_list_button(
int(usr_cmd[1]), update.from_user.id)
await update.message.edit_caption(
caption="Total files: {}".format(total_files),
reply_markup=InlineKeyboardMarkup(file_list))
elif usr_cmd[0] == "myfile":
await gen_file_menu(usr_cmd[1], usr_cmd[2], update)
return
elif usr_cmd[0] == "allfile":
await gen_allfile_menu(usr_cmd[1], usr_cmd[2], update)
return
elif usr_cmd[0] == "myprivfile":
await gen_privfile_menu(usr_cmd[1], usr_cmd[2], update)
return
elif usr_cmd[0] == "sendfile":
myfile = await db.get_file(usr_cmd[1])
file_name = myfile['file']['file_name']
await update.answer(f"Sending File {file_name}")
await update.message.reply_cached_media(myfile['file']['file_id'],caption=f'**{file_name}**')
else:
await update.message.delete()
#---------------------[ MY FILES FUNC ]---------------------#
async def gen_file_list_button(file_list_no: int, user_id: int):
file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10]
user_files, total_files = await db.find_files(user_id, file_range)
file_list = []
async for x in user_files:
file_list.append([
InlineKeyboardButton(f"📦 {x['file']['caption']}",
callback_data=f"myfile_{x['_id']}_{file_list_no}")
])
if total_files > 10:
file_list.append([
InlineKeyboardButton(
"◄",
callback_data="{}".format("userfiles_" +
str(file_list_no -
1) if file_list_no > 1 else 'N/A')),
InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}",
callback_data="N/A"),
InlineKeyboardButton(
"►",
callback_data="{}".format("userfiles_" +
str(file_list_no +
1) if total_files > file_list_no *
10 else 'N/A'))
])
if not file_list:
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")])
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
return file_list, total_files
async def gen_file_menu(_id, file_list_no, update: CallbackQuery):
try:
myfile_info = await db.get_file(_id)
except FileNotFound:
await update.answer("File Not Found")
return
file_id = FileId.decode(myfile_info['file']['file_id'])
if file_id.file_type in PHOTO_TYPES:
file_type = "Image"
elif file_id.file_type == FileType.VOICE:
file_type = "Voice"
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION,
FileType.VIDEO_NOTE):
file_type = "Video"
elif file_id.file_type == FileType.DOCUMENT:
file_type = "Document"
elif file_id.file_type == FileType.STICKER:
file_type = "Sticker"
elif file_id.file_type == FileType.AUDIO:
file_type = "Audio"
else:
file_type = "Unknown"
page_link = f"{Server.URL}app/watch/{myfile_info['_id']}"
stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}"
if "video" in file_type.lower():
MYFILES_BUTTONS = InlineKeyboardMarkup([
[
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link),
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)
],
[
InlineKeyboardButton(
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"),
InlineKeyboardButton(
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}")
],
[
InlineKeyboardButton(
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no))
]
])
else:
MYFILES_BUTTONS = InlineKeyboardMarkup([
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)],
[
InlineKeyboardButton(
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"),
InlineKeyboardButton(
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}")
],
[
InlineKeyboardButton(
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no))
]
])
TiMe = myfile_info['time']
if type(TiMe) == float:
date = datetime.datetime.fromtimestamp(TiMe)
await update.edit_message_caption(
caption="**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`"
.format(myfile_info['file']['file_name'],
humanbytes(int(myfile_info['file']['file_size'])), file_type,
TiMe if isinstance(TiMe, str) else date.date()), reply_markup=MYFILES_BUTTONS)
#################------ private file list
async def gen_privfile_list_button(file_list_no: int, user_id: int):
file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10]
user_files, total_files = await db.find_privfiles(user_id, file_range)
file_list = []
async for x in user_files:
file_list.append([
InlineKeyboardButton(
f"📦 {x['file']['caption']}",
callback_data=f"myprivfile_{x['_id']}_{file_list_no}")
])
if total_files > 10:
file_list.append([
InlineKeyboardButton(
"◄",
callback_data="{}".format("userprivfiles_" +
str(file_list_no -
1) if file_list_no > 1 else 'N/A')),
InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}",
callback_data="N/A"),
InlineKeyboardButton(
"►",
callback_data="{}".format("userprivfiles_" +
str(file_list_no +
1) if total_files > file_list_no *
10 else 'N/A'))
])
if not file_list:
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")])
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
return file_list, total_files
async def gen_privfile_menu(_id, file_list_no, update: CallbackQuery):
try:
myfile_info = await db.get_privfile(_id)
except FileNotFound:
await update.answer("File Not Found")
return
file_id = FileId.decode(myfile_info['file']['file_id'])
if file_id.file_type in PHOTO_TYPES:
file_type = "Image"
elif file_id.file_type == FileType.VOICE:
file_type = "Voice"
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION,
FileType.VIDEO_NOTE):
file_type = "Video"
elif file_id.file_type == FileType.DOCUMENT:
file_type = "Document"
elif file_id.file_type == FileType.STICKER:
file_type = "Sticker"
elif file_id.file_type == FileType.AUDIO:
file_type = "Audio"
else:
file_type = "Unknown"
page_link = f"{Server.URL}app/watch/{myfile_info['_id']}"
stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}"
if "video" in file_type.lower():
MYFILES_BUTTONS = InlineKeyboardMarkup([
[
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link),
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)
],
[
InlineKeyboardButton(
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"),
InlineKeyboardButton(
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}")
],
[
InlineKeyboardButton(
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no))
]
])
else:
MYFILES_BUTTONS = InlineKeyboardMarkup([
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)],
[
InlineKeyboardButton(
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"),
InlineKeyboardButton(
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}")
],
[
InlineKeyboardButton(
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no))
]
])
TiMe = myfile_info['time']
if type(TiMe) == float:
date = datetime.datetime.fromtimestamp(TiMe)
await update.edit_message_caption(
caption=
"**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`"
.format(myfile_info['file_name'],
humanbytes(int(myfile_info['file']['file_size'])), file_type,
TiMe if isinstance(TiMe, str) else date.date()),
reply_markup=MYFILES_BUTTONS)
##################---- all file function-----##################
async def gen_allfile_list_button(file_list_no: int, user_id: int):
file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10]
user_files, total_files = await db.find_all_files(file_range)
file_list = []
async for x in user_files:
file_list.append([
InlineKeyboardButton(
f"📦 {x['file']['caption']}",
callback_data=f"allfile_{x['_id']}_{file_list_no}")
])
if total_files > 10:
file_list.append([
InlineKeyboardButton(
"◄",
callback_data="{}".format("userallfiles_" +
str(file_list_no -
1) if file_list_no > 1 else 'N/A')),
InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}",
callback_data="N/A"),
InlineKeyboardButton(
"►",
callback_data="{}".format("userallfiles_" +
str(file_list_no +
1) if total_files > file_list_no *
10 else 'N/A'))
])
if not file_list:
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")])
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
return file_list, total_files
async def gen_allfile_menu(_id, file_list_no, update: CallbackQuery):
try:
myfile_info = await db.get_file(_id)
except FileNotFound:
await update.answer("File Not Found")
return
file_id = FileId.decode(myfile_info['file']['file_id'])
if file_id.file_type in PHOTO_TYPES:
file_type = "Image"
elif file_id.file_type == FileType.VOICE:
file_type = "Voice"
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION,
FileType.VIDEO_NOTE):
file_type = "Video"
elif file_id.file_type == FileType.DOCUMENT:
file_type = "Document"
elif file_id.file_type == FileType.STICKER:
file_type = "Sticker"
elif file_id.file_type == FileType.AUDIO:
file_type = "Audio"
else:
file_type = "Unknown"
page_link = f"{Server.URL}app/watch/{myfile_info['_id']}"
stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}"
if "video" in file_type.lower():
MYFILES_BUTTONS = InlineKeyboardMarkup([
[
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link),
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)
],
[
InlineKeyboardButton(
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"),
InlineKeyboardButton(
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}")
],
[
InlineKeyboardButton(
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no))
]
])
else:
MYFILES_BUTTONS = InlineKeyboardMarkup([
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)],
[
InlineKeyboardButton(
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"),
InlineKeyboardButton(
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ",
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}")
],
[
InlineKeyboardButton(
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no))
]
])
TiMe = myfile_info['time']
if type(TiMe) == float:
date = datetime.datetime.fromtimestamp(TiMe)
await update.edit_message_caption(
caption=
"**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`"
.format(myfile_info['file']['file_name'],
humanbytes(int(myfile_info['file']['file_size'])), file_type,
TiMe if isinstance(TiMe, str) else date.date()),
reply_markup=MYFILES_BUTTONS)
async def delete_user_file(_id, file_list_no: int, update: CallbackQuery):
try:
myfile_info = await db.get_file(_id)
if not myfile_info:
myfile_info = await db.get_privfile(db_id)
await db.delete_one_privfile(myfile_info['_id'])
#await db.count_links(update.from_user.id, "-")
await update.message.edit_caption(
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**" +
update.message.caption.replace("Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ",
""),
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data=f"userfiles_1")]]))
else:
await db.delete_one_file(myfile_info['_id'])
await db.count_links(update.from_user.id, "-")
await update.message.edit_caption(
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**" +
update.message.caption.replace("Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ",
""),
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data=f"userfiles_1")]]))
except FileNotFound:
await update.answer("File Already Deleted")
return
async def delete_user_filex(_id, update: CallbackQuery):
try:
myfile_info = await db.get_file(_id)
if not myfile_info:
myfile_info = await db.get_privfile(_id)
await db.delete_one_privfile(myfile_info['_id'])
await update.message.edit_caption(
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**\n\n",
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data=f"close")]]))
else:
await db.delete_one_file(myfile_info['_id'])
await db.count_links(update.from_user.id, "-")
await update.message.edit_caption(
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**\n\n",
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data=f"close")]]))
except FileNotFound:
await update.answer("File Already Deleted")
return