|
import math |
|
import asyncio |
|
import datetime |
|
|
|
from pyrogram import filters, Client, raw, types,enums |
|
from pyrogram.errors import FloodWait |
|
from pyrogram.raw import functions |
|
from pyrogram.raw.types import Poll,PollAnswer, InputMediaPoll |
|
|
|
from pyrogram.enums.parse_mode import ParseMode |
|
from pyrogram.file_id import FileId, FileType, PHOTO_TYPES |
|
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery, WebAppInfo |
|
from pyrogram.raw.types import KeyboardButtonSimpleWebView |
|
|
|
|
|
from FileStream import __version__ |
|
from FileStream.Database import Database |
|
from FileStream.config import Telegram, Server |
|
from FileStream.bot import FileStream |
|
from FileStream.bot import MULTI_CLIENTS |
|
from FileStream.Exceptions import FileNotFound |
|
from FileStream.utils.FileProcessors.translation import LANG, BUTTON |
|
from FileStream.utils.FileProcessors.human_readable import humanbytes |
|
from FileStream.bot.plugins.FileHandlers.stream import private_receive_handler |
|
from FileStream.utils.FileProcessors.file_properties import get_file_ids, get_file_info,get_name |
|
from FileStream.utils.FileProcessors.bot_utils import gen_link, priv_func, gen_priv_file_link |
|
|
|
|
|
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) |
|
|
|
|
|
|
|
@FileStream.on_callback_query() |
|
async def cb_data(bot: Client, update: CallbackQuery): |
|
usr_cmd = update.data.split("_") |
|
if usr_cmd[0] == "home": |
|
await update.message.edit_text(text=LANG.START_TEXT.format( |
|
update.from_user.mention, FileStream.username), |
|
disable_web_page_preview=True, |
|
reply_markup=BUTTON.START_BUTTONS) |
|
elif usr_cmd[0] == "help": |
|
await update.message.edit_text(text=LANG.HELP_TEXT.format( |
|
Telegram.OWNER_ID), |
|
disable_web_page_preview=True, |
|
reply_markup=BUTTON.HELP_BUTTONS) |
|
elif usr_cmd[0] == "about": |
|
await update.message.edit_text(text=LANG.ABOUT_TEXT.format( |
|
FileStream.fname, __version__), |
|
disable_web_page_preview=True, |
|
reply_markup=BUTTON.ABOUT_BUTTONS) |
|
|
|
|
|
|
|
elif usr_cmd[0] == "N/A": |
|
await update.answer("N/A", True) |
|
elif usr_cmd[0] == "close": |
|
await update.message.delete() |
|
elif usr_cmd[0] == "back": |
|
try: |
|
user_id = str(usr_cmd[1]) |
|
message_id = int(usr_cmd[2]) |
|
print(user_id, message_id) |
|
message = await FileStream.get_messages(user_id, message_id) |
|
await private_receive_handler(FileStream, message) |
|
except FloodWait as e: |
|
print(f"Sleeping for {str(e.value)}s") |
|
await asyncio.sleep(e.value) |
|
await FileStream.send_message( |
|
chat_id=Telegram.ULOG_GROUP, |
|
text= |
|
f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`", |
|
disable_web_page_preview=True, |
|
parse_mode=ParseMode.MARKDOWN) |
|
|
|
elif usr_cmd[0] == "msgdelete": |
|
await update.message.edit_caption( |
|
caption="**Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ**\n\n", |
|
reply_markup=InlineKeyboardMarkup([[ |
|
InlineKeyboardButton( |
|
"ʏᴇs", callback_data=f"msgdelyes_{usr_cmd[1]}_{usr_cmd[2]}"), |
|
InlineKeyboardButton( |
|
"ɴᴏ", callback_data=f"myfile_{usr_cmd[1]}_{usr_cmd[2]}") |
|
]])) |
|
elif usr_cmd[0] == "msgdelyes": |
|
await delete_user_file(usr_cmd[1], int(usr_cmd[2]), update) |
|
return |
|
elif usr_cmd[0] == "msgdelpvt": |
|
await update.message.edit_caption( |
|
caption="**Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ**\n\n", |
|
reply_markup=InlineKeyboardMarkup([[ |
|
InlineKeyboardButton("ʏᴇs", |
|
callback_data=f"msgdelpvtyes_{usr_cmd[1]}"), |
|
InlineKeyboardButton("ɴᴏ", |
|
callback_data=f"mainstream_{usr_cmd[1]}") |
|
]])) |
|
elif usr_cmd[0] == "msgdelpvtyes": |
|
await delete_user_filex(usr_cmd[1], update) |
|
return |
|
|
|
elif usr_cmd[0] == "mainstream": |
|
_id = usr_cmd[1] |
|
reply_markup, stream_text = await gen_link(_id=_id) |
|
await update.message.edit_text( |
|
text=stream_text, |
|
parse_mode=ParseMode.HTML, |
|
disable_web_page_preview=True, |
|
reply_markup=reply_markup, |
|
) |
|
try: |
|
user_id = str(usr_cmd[1]) |
|
message_id = int(usr_cmd[2]) |
|
response_id = int(usr_cmd[3]) |
|
message = await FileStream.get_messages(user_id, message_id) |
|
|
|
instruction = { |
|
"privacy_type": "PUBLIC", |
|
"user_id": user_id, |
|
"user_type": "TELEGRAM" |
|
} |
|
file_caption = getattr(message, "caption", f"{get_name(message)}" ) or "None/Unknown" |
|
file_name = get_name(message) |
|
print("Detail",file_caption,file_name) |
|
await update.message.edit_text( |
|
text=file_caption, |
|
parse_mode=ParseMode.HTML, |
|
disable_web_page_preview=True, |
|
reply_markup=InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("Movie", callback_data=f"pubup_{user_id}_{message_id}_Movie")], |
|
[InlineKeyboardButton("WebSeries", callback_data=f"pubup_{user_id}_{message_id}_WebSeries")], |
|
[InlineKeyboardButton("Anime", callback_data=f"pubup_{user_id}_{message_id}_Anime")], |
|
[InlineKeyboardButton("Documentary", callback_data=f"pubup_{user_id}_{message_id}_Documentary")], |
|
[InlineKeyboardButton("Regular", callback_data=f"pubup_{user_id}_{message_id}_Regular")], |
|
]), |
|
) |
|
"""#await FileStream.delete_messages(chat_id=user_id,message_ids=response_id) |
|
await FileStream.send_poll( |
|
type= enums.PollType.REGULAR, |
|
reply_to_message_id=message_id, |
|
explanation=f"{file_caption}", |
|
question=f"{message_id}_{user_id}_{response_id}_File Name :\n{file_caption}", |
|
options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], |
|
allows_multiple_answers=True, |
|
chat_id=user_id |
|
) |
|
""" |
|
except Exception as e: |
|
print(f"An error occurred: {str(e)}") |
|
await FileStream.send_message( |
|
chat_id=Telegram.ULOG_GROUP, |
|
text=f"An error occurred: {str(e)}" |
|
) |
|
|
|
elif usr_cmd[0] == "pubup": |
|
try: |
|
user_id = str(usr_cmd[1]) |
|
message_id = int(usr_cmd[2]) |
|
msg_type = usr_cmd[3] |
|
|
|
message = await FileStream.get_messages(user_id, message_id) |
|
|
|
|
|
instruction = { |
|
"privacy_type": "PUBLIC", |
|
"user_id": user_id, |
|
"user_type": "TELEGRAM" |
|
} |
|
file_caption = getattr(message, "caption", f"{get_name(message)}" ) or "None/Unknown" |
|
file_name = get_name(message) |
|
print("Detail",file_caption,"\nFile:",file_name) |
|
""" |
|
await FileStream.delete_messages( |
|
chat_id=user_id, |
|
message_ids=message_id |
|
) |
|
await FileStream.send_poll( |
|
type= enums.PollType.REGULAR, |
|
reply_to_message_id=message_id, |
|
question="Is this a poll question?", |
|
options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], |
|
allows_multiple_answers=True, |
|
chat_id=user_id |
|
) |
|
|
|
functions.messages.EditMessage( |
|
peer=user_id, |
|
id=message_id, |
|
media=InputMediaPoll( |
|
poll=Poll( |
|
id=FileStream.rnd_id(), |
|
question="Is this a poll question?", |
|
answers=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], |
|
multiple_choice=True, |
|
quiz=False, |
|
) |
|
), |
|
) |
|
|
|
print("Poll function and all initializers executed successfully.") |
|
print("dir(Poll):", dir(Poll)) |
|
print("Poll.__init__:", Poll.__init__) |
|
print("Poll.__init__ inputs:", Poll.__init__.__code__.co_varnames) |
|
print("InputMediaPoll.__init__ inputs:", InputMediaPoll.__init__.__code__.co_varnames) |
|
update=await FileStream.edit_message_media( |
|
chat_id=user_id, |
|
message_id=response_id, |
|
media=InputMediaPoll( |
|
caption="Is this a poll question?", |
|
poll=Poll( |
|
id=FileStream.rnd_id(), |
|
question="Is this a poll question?", |
|
answers=[ |
|
PollAnswer(text="Movie", option=bytes([1])), |
|
PollAnswer(text="WebSeries", option=bytes([2])), |
|
PollAnswer(text="Anime", option=bytes([3])), |
|
PollAnswer(text="Documentary", option=bytes([4])) |
|
], |
|
closed=False, |
|
public_voters=True, |
|
multiple_choice=True, |
|
quiz=False, |
|
close_period=None, |
|
close_date=None |
|
), |
|
) |
|
) |
|
print("Update:",update) |
|
|
|
PollAnswer( |
|
id=FileStream.rnd_id(), |
|
question="Is this a poll question?", |
|
answers=["Movie","WebSeries"), |
|
PollAnswer(text="Anime"), |
|
PollAnswer(text="Documentary"), |
|
PollAnswer(text="Regular") |
|
], |
|
multiple_choice=True, |
|
quiz=False, |
|
answers=[ |
|
PollAnswer("Movie"), |
|
PollAnswer(text="WebSeries"), |
|
PollAnswer(text="Anime"), |
|
PollAnswer(text="Documentary"), |
|
PollAnswer(text="Regular") |
|
], |
|
await FileStream.send_poll( |
|
type= enums.PollType.REGULAR, |
|
reply_to_message_id=message_id, |
|
question="Is this a poll question?", |
|
options=["Movie", "WebSeries", "Anime", "Documentary", "Regular"], |
|
allows_multiple_answers=True, |
|
chat_id=user_id |
|
) |
|
|
|
#type=enums.PollType.QUIZ, |
|
print(polls) |
|
""" |
|
file_info = get_file_info(message, instruction) |
|
|
|
inserted_id = await db.add_file(file_info) |
|
await get_file_ids(False, inserted_id, message) |
|
|
|
await db.update_privacy(file_info) |
|
reply_markup, stream_text = await gen_link(_id=inserted_id) |
|
await update.message.edit_text( |
|
text=stream_text, |
|
parse_mode=ParseMode.HTML, |
|
disable_web_page_preview=True, |
|
reply_markup=reply_markup, |
|
) |
|
except FloodWait as e: |
|
print(f"Sleeping for {str(e.value)}s") |
|
await asyncio.sleep(e.value) |
|
await FileStream.send_message( |
|
chat_id=Telegram.ULOG_GROUP, |
|
text= |
|
f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`", |
|
disable_web_page_preview=True, |
|
parse_mode=ParseMode.MARKDOWN) |
|
except Exception as e: |
|
print(f"An error occurred: {str(e)}") |
|
await FileStream.send_message( |
|
chat_id=Telegram.ULOG_GROUP, |
|
text=f"An error occurred: {str(e)}" |
|
) |
|
|
|
elif usr_cmd[0] == "privup": |
|
try: |
|
user_id = str(usr_cmd[1]) |
|
message_id = int(usr_cmd[2]) |
|
message = await FileStream.get_messages(user_id, message_id) |
|
instruction = { |
|
|
|
"privacy_type": "PRIVATE", |
|
"user_id": user_id, |
|
"user_type": "TELEGRAM" |
|
} |
|
file_info = get_file_info(message, instruction) |
|
|
|
db_id = await db.add_file(file_info) |
|
await get_file_ids(False, db_id, message) |
|
|
|
if True: |
|
file_info = await db.get_file(db_id) |
|
reply_markup, stream_text = await priv_func(file_info['file']['file_name'], file_info['file']['file_size']) |
|
|
|
await update.message.edit_text( |
|
text=stream_text, |
|
parse_mode=ParseMode.HTML, |
|
disable_web_page_preview=True, |
|
reply_markup=reply_markup, |
|
) |
|
except FloodWait as e: |
|
print(f"Sleeping for {str(e.value)}s") |
|
await asyncio.sleep(e.value) |
|
await FileStream.send_message( |
|
chat_id=Telegram.ULOG_GROUP, |
|
text= |
|
f"Gᴏᴛ FʟᴏᴏᴅWᴀɪᴛ ᴏғ {str(e.value)}s ғʀᴏᴍ [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n\n**ᴜsᴇʀ ɪᴅ :** `{str(message.from_user.id)}`", |
|
disable_web_page_preview=True, |
|
parse_mode=ParseMode.MARKDOWN) |
|
|
|
elif usr_cmd[0] == "userfiles": |
|
file_list, total_files = await gen_file_list_button(int(usr_cmd[1]), update.from_user.id) |
|
await update.message.edit_caption( |
|
caption="Total files: {}".format(total_files), |
|
reply_markup=InlineKeyboardMarkup(file_list)) |
|
|
|
elif usr_cmd[0] == "userprivfiles": |
|
file_list, total_files = await gen_privfile_list_button( |
|
int(usr_cmd[1]), update.from_user.id) |
|
await update.message.edit_caption( |
|
caption="Total files: {}".format(total_files), |
|
reply_markup=InlineKeyboardMarkup(file_list)) |
|
|
|
elif usr_cmd[0] == "userallfiles": |
|
file_list, total_files = await gen_allfile_list_button( |
|
int(usr_cmd[1]), update.from_user.id) |
|
await update.message.edit_caption( |
|
caption="Total files: {}".format(total_files), |
|
reply_markup=InlineKeyboardMarkup(file_list)) |
|
|
|
elif usr_cmd[0] == "myfile": |
|
await gen_file_menu(usr_cmd[1], usr_cmd[2], update) |
|
return |
|
elif usr_cmd[0] == "allfile": |
|
await gen_allfile_menu(usr_cmd[1], usr_cmd[2], update) |
|
return |
|
elif usr_cmd[0] == "myprivfile": |
|
await gen_privfile_menu(usr_cmd[1], usr_cmd[2], update) |
|
return |
|
elif usr_cmd[0] == "sendfile": |
|
myfile = await db.get_file(usr_cmd[1]) |
|
file_name = myfile['file']['file_name'] |
|
await update.answer(f"Sending File {file_name}") |
|
await update.message.reply_cached_media(myfile['file']['file_id'],caption=f'**{file_name}**') |
|
else: |
|
await update.message.delete() |
|
|
|
|
|
|
|
|
|
async def gen_file_list_button(file_list_no: int, user_id: int): |
|
|
|
file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10] |
|
user_files, total_files = await db.find_files(user_id, file_range) |
|
|
|
file_list = [] |
|
async for x in user_files: |
|
file_list.append([ |
|
InlineKeyboardButton(f"📦 {x['file']['caption']}", |
|
callback_data=f"myfile_{x['_id']}_{file_list_no}") |
|
]) |
|
if total_files > 10: |
|
file_list.append([ |
|
InlineKeyboardButton( |
|
"◄", |
|
callback_data="{}".format("userfiles_" + |
|
str(file_list_no - |
|
1) if file_list_no > 1 else 'N/A')), |
|
InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}", |
|
callback_data="N/A"), |
|
InlineKeyboardButton( |
|
"►", |
|
callback_data="{}".format("userfiles_" + |
|
str(file_list_no + |
|
1) if total_files > file_list_no * |
|
10 else 'N/A')) |
|
]) |
|
if not file_list: |
|
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")]) |
|
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) |
|
return file_list, total_files |
|
|
|
|
|
async def gen_file_menu(_id, file_list_no, update: CallbackQuery): |
|
try: |
|
myfile_info = await db.get_file(_id) |
|
except FileNotFound: |
|
await update.answer("File Not Found") |
|
return |
|
|
|
file_id = FileId.decode(myfile_info['file']['file_id']) |
|
|
|
if file_id.file_type in PHOTO_TYPES: |
|
file_type = "Image" |
|
elif file_id.file_type == FileType.VOICE: |
|
file_type = "Voice" |
|
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, |
|
FileType.VIDEO_NOTE): |
|
file_type = "Video" |
|
elif file_id.file_type == FileType.DOCUMENT: |
|
file_type = "Document" |
|
elif file_id.file_type == FileType.STICKER: |
|
file_type = "Sticker" |
|
elif file_id.file_type == FileType.AUDIO: |
|
file_type = "Audio" |
|
else: |
|
file_type = "Unknown" |
|
|
|
page_link = f"{Server.URL}app/watch/{myfile_info['_id']}" |
|
stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}" |
|
if "video" in file_type.lower(): |
|
MYFILES_BUTTONS = InlineKeyboardMarkup([ |
|
[ |
|
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), |
|
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) |
|
], |
|
[ |
|
InlineKeyboardButton( |
|
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), |
|
InlineKeyboardButton( |
|
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", |
|
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") |
|
], |
|
[ |
|
InlineKeyboardButton( |
|
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) |
|
] |
|
]) |
|
else: |
|
MYFILES_BUTTONS = InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], |
|
[ |
|
InlineKeyboardButton( |
|
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), |
|
InlineKeyboardButton( |
|
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", |
|
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") |
|
], |
|
[ |
|
InlineKeyboardButton( |
|
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) |
|
] |
|
]) |
|
|
|
TiMe = myfile_info['time'] |
|
if type(TiMe) == float: |
|
date = datetime.datetime.fromtimestamp(TiMe) |
|
await update.edit_message_caption( |
|
caption="**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`" |
|
.format(myfile_info['file']['file_name'], |
|
humanbytes(int(myfile_info['file']['file_size'])), file_type, |
|
TiMe if isinstance(TiMe, str) else date.date()), reply_markup=MYFILES_BUTTONS) |
|
|
|
|
|
|
|
async def gen_privfile_list_button(file_list_no: int, user_id: int): |
|
|
|
file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10] |
|
user_files, total_files = await db.find_privfiles(user_id, file_range) |
|
|
|
file_list = [] |
|
async for x in user_files: |
|
file_list.append([ |
|
InlineKeyboardButton( |
|
f"📦 {x['file']['caption']}", |
|
callback_data=f"myprivfile_{x['_id']}_{file_list_no}") |
|
]) |
|
if total_files > 10: |
|
file_list.append([ |
|
InlineKeyboardButton( |
|
"◄", |
|
callback_data="{}".format("userprivfiles_" + |
|
str(file_list_no - |
|
1) if file_list_no > 1 else 'N/A')), |
|
InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}", |
|
callback_data="N/A"), |
|
InlineKeyboardButton( |
|
"►", |
|
callback_data="{}".format("userprivfiles_" + |
|
str(file_list_no + |
|
1) if total_files > file_list_no * |
|
10 else 'N/A')) |
|
]) |
|
if not file_list: |
|
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")]) |
|
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) |
|
return file_list, total_files |
|
|
|
|
|
async def gen_privfile_menu(_id, file_list_no, update: CallbackQuery): |
|
try: |
|
myfile_info = await db.get_privfile(_id) |
|
except FileNotFound: |
|
await update.answer("File Not Found") |
|
return |
|
|
|
file_id = FileId.decode(myfile_info['file']['file_id']) |
|
|
|
if file_id.file_type in PHOTO_TYPES: |
|
file_type = "Image" |
|
elif file_id.file_type == FileType.VOICE: |
|
file_type = "Voice" |
|
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, |
|
FileType.VIDEO_NOTE): |
|
file_type = "Video" |
|
elif file_id.file_type == FileType.DOCUMENT: |
|
file_type = "Document" |
|
elif file_id.file_type == FileType.STICKER: |
|
file_type = "Sticker" |
|
elif file_id.file_type == FileType.AUDIO: |
|
file_type = "Audio" |
|
else: |
|
file_type = "Unknown" |
|
|
|
page_link = f"{Server.URL}app/watch/{myfile_info['_id']}" |
|
stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}" |
|
if "video" in file_type.lower(): |
|
MYFILES_BUTTONS = InlineKeyboardMarkup([ |
|
[ |
|
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), |
|
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) |
|
], |
|
[ |
|
InlineKeyboardButton( |
|
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), |
|
InlineKeyboardButton( |
|
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", |
|
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") |
|
], |
|
[ |
|
InlineKeyboardButton( |
|
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) |
|
] |
|
]) |
|
else: |
|
MYFILES_BUTTONS = InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], |
|
[ |
|
InlineKeyboardButton( |
|
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), |
|
InlineKeyboardButton( |
|
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", |
|
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") |
|
], |
|
[ |
|
InlineKeyboardButton( |
|
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) |
|
] |
|
]) |
|
|
|
TiMe = myfile_info['time'] |
|
if type(TiMe) == float: |
|
date = datetime.datetime.fromtimestamp(TiMe) |
|
await update.edit_message_caption( |
|
caption= |
|
"**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`" |
|
.format(myfile_info['file_name'], |
|
humanbytes(int(myfile_info['file']['file_size'])), file_type, |
|
TiMe if isinstance(TiMe, str) else date.date()), |
|
reply_markup=MYFILES_BUTTONS) |
|
|
|
|
|
|
|
async def gen_allfile_list_button(file_list_no: int, user_id: int): |
|
|
|
file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10] |
|
user_files, total_files = await db.find_all_files(file_range) |
|
|
|
file_list = [] |
|
async for x in user_files: |
|
file_list.append([ |
|
InlineKeyboardButton( |
|
f"📦 {x['file']['caption']}", |
|
callback_data=f"allfile_{x['_id']}_{file_list_no}") |
|
]) |
|
if total_files > 10: |
|
file_list.append([ |
|
InlineKeyboardButton( |
|
"◄", |
|
callback_data="{}".format("userallfiles_" + |
|
str(file_list_no - |
|
1) if file_list_no > 1 else 'N/A')), |
|
InlineKeyboardButton(f"{file_list_no}/{math.ceil(total_files/10)}", |
|
callback_data="N/A"), |
|
InlineKeyboardButton( |
|
"►", |
|
callback_data="{}".format("userallfiles_" + |
|
str(file_list_no + |
|
1) if total_files > file_list_no * |
|
10 else 'N/A')) |
|
]) |
|
if not file_list: |
|
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")]) |
|
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) |
|
return file_list, total_files |
|
|
|
|
|
async def gen_allfile_menu(_id, file_list_no, update: CallbackQuery): |
|
try: |
|
myfile_info = await db.get_file(_id) |
|
except FileNotFound: |
|
await update.answer("File Not Found") |
|
return |
|
|
|
file_id = FileId.decode(myfile_info['file']['file_id']) |
|
|
|
if file_id.file_type in PHOTO_TYPES: |
|
file_type = "Image" |
|
elif file_id.file_type == FileType.VOICE: |
|
file_type = "Voice" |
|
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, |
|
FileType.VIDEO_NOTE): |
|
file_type = "Video" |
|
elif file_id.file_type == FileType.DOCUMENT: |
|
file_type = "Document" |
|
elif file_id.file_type == FileType.STICKER: |
|
file_type = "Sticker" |
|
elif file_id.file_type == FileType.AUDIO: |
|
file_type = "Audio" |
|
else: |
|
file_type = "Unknown" |
|
|
|
page_link = f"{Server.URL}app/watch/{myfile_info['_id']}" |
|
stream_link = f"{Server.URL}api/dl/{myfile_info['_id']}" |
|
if "video" in file_type.lower(): |
|
MYFILES_BUTTONS = InlineKeyboardMarkup([ |
|
[ |
|
InlineKeyboardButton("sᴛʀᴇᴀᴍ", url=page_link), |
|
InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link) |
|
], |
|
[ |
|
InlineKeyboardButton( |
|
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), |
|
InlineKeyboardButton( |
|
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", |
|
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") |
|
], |
|
[ |
|
InlineKeyboardButton( |
|
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) |
|
] |
|
]) |
|
else: |
|
MYFILES_BUTTONS = InlineKeyboardMarkup([ |
|
[InlineKeyboardButton("ᴅᴏᴡɴʟᴏᴀᴅ", url=stream_link)], |
|
[ |
|
InlineKeyboardButton( |
|
"ɢᴇᴛ ғɪʟᴇ", callback_data=f"sendfile_{myfile_info['_id']}"), |
|
InlineKeyboardButton( |
|
"ʀᴇᴠᴏᴋᴇ ғɪʟᴇ", |
|
callback_data=f"msgdelete_{myfile_info['_id']}_{file_list_no}") |
|
], |
|
[ |
|
InlineKeyboardButton( |
|
"ʙᴀᴄᴋ", callback_data="userfiles_{}".format(file_list_no)) |
|
] |
|
]) |
|
|
|
TiMe = myfile_info['time'] |
|
if type(TiMe) == float: |
|
date = datetime.datetime.fromtimestamp(TiMe) |
|
await update.edit_message_caption( |
|
caption= |
|
"**File Name :** `{}`\n**File Size :** `{}`\n**File Type :** `{}`\n**Created On :** `{}`" |
|
.format(myfile_info['file']['file_name'], |
|
humanbytes(int(myfile_info['file']['file_size'])), file_type, |
|
TiMe if isinstance(TiMe, str) else date.date()), |
|
reply_markup=MYFILES_BUTTONS) |
|
|
|
|
|
async def delete_user_file(_id, file_list_no: int, update: CallbackQuery): |
|
|
|
try: |
|
myfile_info = await db.get_file(_id) |
|
if not myfile_info: |
|
|
|
myfile_info = await db.get_privfile(db_id) |
|
await db.delete_one_privfile(myfile_info['_id']) |
|
|
|
await update.message.edit_caption( |
|
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**" + |
|
update.message.caption.replace("Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ", |
|
""), |
|
reply_markup=InlineKeyboardMarkup( |
|
[[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data=f"userfiles_1")]])) |
|
else: |
|
await db.delete_one_file(myfile_info['_id']) |
|
await db.count_links(update.from_user.id, "-") |
|
await update.message.edit_caption( |
|
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**" + |
|
update.message.caption.replace("Cᴏɴғɪʀᴍ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴅᴇʟᴇᴛᴇ ᴛʜᴇ Fɪʟᴇ", |
|
""), |
|
reply_markup=InlineKeyboardMarkup( |
|
[[InlineKeyboardButton("ʙᴀᴄᴋ", callback_data=f"userfiles_1")]])) |
|
|
|
except FileNotFound: |
|
await update.answer("File Already Deleted") |
|
return |
|
|
|
|
|
async def delete_user_filex(_id, update: CallbackQuery): |
|
|
|
try: |
|
myfile_info = await db.get_file(_id) |
|
if not myfile_info: |
|
|
|
myfile_info = await db.get_privfile(_id) |
|
await db.delete_one_privfile(myfile_info['_id']) |
|
await update.message.edit_caption( |
|
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**\n\n", |
|
reply_markup=InlineKeyboardMarkup( |
|
[[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data=f"close")]])) |
|
else: |
|
await db.delete_one_file(myfile_info['_id']) |
|
await db.count_links(update.from_user.id, "-") |
|
await update.message.edit_caption( |
|
caption="**Fɪʟᴇ Dᴇʟᴇᴛᴇᴅ Sᴜᴄᴄᴇssғᴜʟʟʏ !**\n\n", |
|
reply_markup=InlineKeyboardMarkup( |
|
[[InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data=f"close")]])) |
|
|
|
except FileNotFound: |
|
await update.answer("File Already Deleted") |
|
return |
|
|