|
import math |
|
import logging |
|
import asyncio |
|
import traceback |
|
from pyrogram import filters, Client |
|
from pyrogram.enums.parse_mode import ParseMode |
|
from pyrogram.types import BotCommand, Message, ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton |
|
|
|
from FileStream import __version__ |
|
from FileStream.bot import FileStream |
|
from FileStream.Exceptions import FileNotFound |
|
from FileStream.utils.FileProcessors.bot_utils import gen_linkx, verify_user, verify_users |
|
from FileStream.config import Telegram |
|
from FileStream.Database import Database |
|
from FileStream.utils.FileProcessors.translation import LANG, BUTTON |
|
|
|
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) |
|
|
|
|
|
async def menu(bot, message): |
|
await bot.send_message( |
|
chat_id=message.from_user.id, |
|
text="ꜱᴇʟᴇᴄᴛ ꜰʀᴏᴍ ᴍᴇɴᴜ", |
|
reply_markup=ReplyKeyboardMarkup( |
|
[[("ꜰɪʟᴇ ʙᴀɴᴋ"),("ꜱᴇᴀʀᴄʜ")], [("ᴍʏ ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ"), ("ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ")]], |
|
one_time_keyboard=False, |
|
resize_keyboard=True, |
|
placeholder="🄼🄰🄸🄽 🄼🄴🄽🅄 👉")) |
|
|
|
|
|
@FileStream.on_message(filters.command('start') & filters.private) |
|
@verify_users |
|
async def start(bot: Client, message: Message, response): |
|
if response['status'] == "AUTHORIZED": |
|
usr_cmd = message.text.split("_")[-1] |
|
if usr_cmd == "/start": |
|
await bot.set_bot_commands([ |
|
BotCommand("start", "ꜱᴛᴀʀᴛ ᴛʜᴇ ʙᴏᴛ"), |
|
BotCommand("help", "ʙᴏᴛ ꜱᴇᴛᴛɪɴɢꜱ"), |
|
BotCommand("about", "ᴀʙᴏᴜᴛ ᴛʜᴇ ʙᴏᴛ"), |
|
BotCommand("search", "ꜱᴇᴀʀᴄʜ ɪɴ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ"), |
|
BotCommand("filebank", "ꜰɪʟᴇ ʙᴀɴᴋ"), |
|
BotCommand("files", "ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ"), |
|
BotCommand("myfiles", "ᴍʏ ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ"), |
|
BotCommand("admin", "ɪ ᴀᴍ ᴀᴅᴍɪɴ") |
|
]) |
|
await menu(bot, message) |
|
if Telegram.START_PIC: |
|
await message.reply_photo(photo=Telegram.START_PIC, |
|
caption=LANG.START_TEXT.format( |
|
message.from_user.mention, |
|
FileStream.username), |
|
parse_mode=ParseMode.HTML, |
|
reply_markup=BUTTON.START_BUTTONS) |
|
else: |
|
await message.reply_text(text=LANG.START_TEXT.format( |
|
message.from_user.mention, FileStream.username), |
|
parse_mode=ParseMode.HTML, |
|
disable_web_page_preview=True, |
|
reply_markup=BUTTON.START_BUTTONS) |
|
else: |
|
if "stream_" in message.text: |
|
try: |
|
file_check = await db.get_file(usr_cmd) |
|
file_id = str(file_check['_id']) |
|
if file_id == usr_cmd: |
|
reply_markup, stream_text = await gen_linkx( |
|
m=message, |
|
_id=file_id, |
|
name=[FileStream.username, FileStream.fname]) |
|
await message.reply_text(text=stream_text, |
|
parse_mode=ParseMode.HTML, |
|
disable_web_page_preview=True, |
|
reply_markup=reply_markup, |
|
quote=True) |
|
|
|
except FIleNotFound as e: |
|
await message.reply_text("File Not Found") |
|
except Exception as e: |
|
await message.reply_text("Something Went Wrong") |
|
logging.error(e) |
|
|
|
elif "file_" in message.text: |
|
try: |
|
file_check = await db.get_file(usr_cmd) |
|
db_id = str(file_check['_id']) |
|
file_id = file_check['file']['file_id'] |
|
file_name = file_check['file']['file_name'] |
|
if db_id == usr_cmd: |
|
filex = await message.reply_cached_media( |
|
file_id=file_id, caption=f'**{file_name}**') |
|
await asyncio.sleep(3600) |
|
try: |
|
await filex.delete() |
|
await message.delete() |
|
except Exception: |
|
pass |
|
|
|
except FIleNotFound as e: |
|
await message.reply_text("**File Not Found**") |
|
except Exception as e: |
|
await message.reply_text("Something Went Wrong") |
|
logging.error(e) |
|
|
|
else: |
|
await message.reply_text(f"**Invalid Command**") |
|
|
|
|
|
@FileStream.on_message(filters.private & filters.command(["about"])) |
|
async def about_handler(bot, message): |
|
if Telegram.START_PIC: |
|
await message.reply_photo(photo=Telegram.START_PIC, |
|
caption=LANG.ABOUT_TEXT.format(FileStream.fname, __version__), |
|
parse_mode=ParseMode.HTML, |
|
reply_markup=BUTTON.ABOUT_BUTTONS) |
|
else: |
|
await message.reply_text(text=LANG.ABOUT_TEXT.format(FileStream.fname, __version__), |
|
disable_web_page_preview=True, |
|
reply_markup=BUTTON.ABOUT_BUTTONS) |
|
|
|
|
|
@FileStream.on_message((filters.command('help')) & filters.private) |
|
async def help_handler(bot, message): |
|
if Telegram.START_PIC: |
|
await message.reply_photo(photo=Telegram.START_PIC, |
|
caption=LANG.HELP_TEXT.format(Telegram.OWNER_ID), |
|
parse_mode=ParseMode.HTML, |
|
reply_markup=BUTTON.HELP_BUTTONS) |
|
else: |
|
await message.reply_text(text=LANG.HELP_TEXT.format(Telegram.OWNER_ID), |
|
parse_mode=ParseMode.HTML, |
|
disable_web_page_preview=True, |
|
reply_markup=BUTTON.HELP_BUTTONS) |
|
|