Spaces:
Running
Running
| import math | |
| import logging | |
| import asyncio | |
| import traceback | |
| from pyrogram import filters, Client | |
| from pyrogram.enums.parse_mode import ParseMode | |
| from pyrogram.types import BotCommand, Message, ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton | |
| from FileStream import __version__ | |
| from FileStream.bot import FileStream | |
| from FileStream.Exceptions import FIleNotFound | |
| from FileStream.utils.FileProcessors.bot_utils import gen_linkx, verify_user, verify_users | |
| from FileStream.config import Telegram | |
| from FileStream.Database import Database | |
| from FileStream.utils.FileProcessors.translation import LANG, BUTTON | |
| db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) | |
| async def menu(bot, message): | |
| await bot.send_message( | |
| chat_id=message.from_user.id, | |
| text="ꜱᴇʟᴇᴄᴛ ꜰʀᴏᴍ ᴍᴇɴᴜ", | |
| reply_markup=ReplyKeyboardMarkup( | |
| [[("ꜰɪʟᴇ ʙᴀɴᴋ"),("ꜱᴇᴀʀᴄʜ")], [("ᴍʏ ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ"), ("ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ")]], | |
| one_time_keyboard=False, | |
| resize_keyboard=True, | |
| placeholder="🄼🄰🄸🄽 🄼🄴🄽🅄 👉")) | |
| async def start(bot: Client, message: Message, response): | |
| if response['status'] == "AUTHORIZED": | |
| usr_cmd = message.text.split("_")[-1] | |
| if usr_cmd == "/start": | |
| await bot.set_bot_commands([ | |
| BotCommand("start", "ꜱᴛᴀʀᴛ ᴛʜᴇ ʙᴏᴛ"), | |
| BotCommand("help", "ʙᴏᴛ ꜱᴇᴛᴛɪɴɢꜱ"), | |
| BotCommand("about", "ᴀʙᴏᴜᴛ ᴛʜᴇ ʙᴏᴛ"), | |
| BotCommand("search", "ꜱᴇᴀʀᴄʜ ɪɴ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ"), | |
| BotCommand("filebank", "ꜰɪʟᴇ ʙᴀɴᴋ"), | |
| BotCommand("files", "ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ"), | |
| BotCommand("myfiles", "ᴍʏ ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ"), | |
| ]) | |
| await menu(bot, message) | |
| if Telegram.START_PIC: | |
| await message.reply_photo(photo=Telegram.START_PIC, | |
| caption=LANG.START_TEXT.format( | |
| message.from_user.mention, | |
| FileStream.username), | |
| parse_mode=ParseMode.HTML, | |
| reply_markup=BUTTON.START_BUTTONS) | |
| else: | |
| await message.reply_text(text=LANG.START_TEXT.format( | |
| message.from_user.mention, FileStream.username), | |
| parse_mode=ParseMode.HTML, | |
| disable_web_page_preview=True, | |
| reply_markup=BUTTON.START_BUTTONS) | |
| else: | |
| if "stream_" in message.text: | |
| try: | |
| file_check = await db.get_file(usr_cmd) | |
| file_id = str(file_check['_id']) | |
| if file_id == usr_cmd: | |
| reply_markup, stream_text = await gen_linkx( | |
| m=message, | |
| _id=file_id, | |
| name=[FileStream.username, FileStream.fname]) | |
| await message.reply_text(text=stream_text, | |
| parse_mode=ParseMode.HTML, | |
| disable_web_page_preview=True, | |
| reply_markup=reply_markup, | |
| quote=True) | |
| except FIleNotFound as e: | |
| await message.reply_text("File Not Found") | |
| except Exception as e: | |
| await message.reply_text("Something Went Wrong") | |
| logging.error(e) | |
| elif "file_" in message.text: | |
| try: | |
| file_check = await db.get_file(usr_cmd) | |
| db_id = str(file_check['_id']) | |
| file_id = file_check['file']['file_id'] | |
| file_name = file_check['file']['file_name'] | |
| if db_id == usr_cmd: | |
| filex = await message.reply_cached_media( | |
| file_id=file_id, caption=f'**{file_name}**') | |
| await asyncio.sleep(3600) | |
| try: | |
| await filex.delete() | |
| await message.delete() | |
| except Exception: | |
| pass | |
| except FIleNotFound as e: | |
| await message.reply_text("**File Not Found**") | |
| except Exception as e: | |
| await message.reply_text("Something Went Wrong") | |
| logging.error(e) | |
| else: | |
| await message.reply_text(f"**Invalid Command**") | |
| async def about_handler(bot, message): | |
| if Telegram.START_PIC: | |
| await message.reply_photo(photo=Telegram.START_PIC, | |
| caption=LANG.ABOUT_TEXT.format( | |
| FileStream.fname, __version__), | |
| parse_mode=ParseMode.HTML, | |
| reply_markup=BUTTON.ABOUT_BUTTONS) | |
| else: | |
| await message.reply_text(text=LANG.ABOUT_TEXT.format( | |
| FileStream.fname, __version__), | |
| disable_web_page_preview=True, | |
| reply_markup=BUTTON.ABOUT_BUTTONS) | |
| async def help_handler(bot, message): | |
| if Telegram.START_PIC: | |
| await message.reply_photo(photo=Telegram.START_PIC, | |
| caption=LANG.HELP_TEXT.format(Telegram.OWNER_ID), | |
| parse_mode=ParseMode.HTML, | |
| reply_markup=BUTTON.HELP_BUTTONS) | |
| else: | |
| await message.reply_text(text=LANG.HELP_TEXT.format(Telegram.OWNER_ID), | |
| parse_mode=ParseMode.HTML, | |
| disable_web_page_preview=True, | |
| reply_markup=BUTTON.HELP_BUTTONS) | |