import math import asyncio import logging from pyrogram import filters, Client from pyrogram.types import BotCommand from pyrogram.enums.parse_mode import ParseMode from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ReplyKeyboardMarkup, KeyboardButton, Message, InlineQueryResultArticle, InputTextMessageContent, InlineQueryResultPhoto from pyrogram.types import InlineQueryResultVideo, InlineQueryResultAudio, InlineQueryResultCachedDocument,WebAppInfo #----------------------------Local Imports ---------------------# from FileStream import __version__ from FileStream.bot import FileStream from FileStream.config import Telegram from FileStream.Database import Database from FileStream.Exceptions import FIleNotFound from FileStream.utils.FileProcessors.human_readable import humanbytes from FileStream.utils.FileProcessors.bot_utils import gen_linkx, verify_user, verify_users from FileStream.utils.FileProcessors.translation import LANG, BUTTON # Set new commands db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) #----------------------------search Commands ---------------------# @FileStream.on_message((filters.command('search') & filters.private) | (filters.regex("ꜱᴇᴀʀᴄʜ") & filters.private)) @verify_users async def search_files(bot: Client, message: Message, response): await message.reply(text="ꜱᴇᴀʀᴄʜ ɪɴ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton( 'Search', switch_inline_query_current_chat='') ]])) @FileStream.on_message(filters.command("webview")) @verify_users async def send_webview_button(bot: Client, message: Message, response): # Define the inline keyboard with a button that opens a web page webapp_button = InlineKeyboardMarkup( [ [InlineKeyboardButton("Open WebApp", web_app=WebAppInfo(url="https://youtube.com"))] ] ) await message.reply_text( "Click the button below to open the WebApp:", reply_markup=webapp_button ) #-------------------------Files---------------------------------# @FileStream.on_message((filters.command('files') & filters.private) | (filters.regex("ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ") & filters.private)) @verify_users async def my_files(bot: Client, message: Message, response): user_files, total_files = await db.find_files(message.from_user.id, [1, 10]) file_list = [] async for x in user_files: file_list.append([ InlineKeyboardButton(f"📦 {x['file']['caption']}", callback_data=f"myfile_{x['_id']}_{1}") ]) if total_files > 10: file_list.append([ InlineKeyboardButton("◄", callback_data="N/A"), InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}", callback_data="N/A"), InlineKeyboardButton("►", callback_data="userfiles_2") ], ) if not file_list: file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) await message.reply_photo(photo=Telegram.FILE_PIC, caption="Total files: {}".format(total_files), reply_markup=InlineKeyboardMarkup(file_list)) # -----------------------------Bot Private Files Command -----------------------------------------------# @FileStream.on_message((filters.command('myfiles') & filters.private) | (filters.regex("ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ") & filters.private)) @verify_users async def my_privfiles(bot: Client, message: Message, response): user_files, total_files = await db.find_private_files( message.from_user.id, [1, 10]) file_list = [] async for x in user_files: file_list.append([ InlineKeyboardButton(f" 📦 {x['file']['caption']}",callback_data=f"myprivfile_{x['_id']}_{1}") ]) if total_files > 10: file_list.append([ InlineKeyboardButton("◄", callback_data="N/A"), InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}", callback_data="N/A"), InlineKeyboardButton("►", callback_data="userfiles_2") ], ) if not file_list: file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) await message.reply_photo(photo=Telegram.FILE_PIC, caption="Total files: {}".format(total_files), reply_markup=InlineKeyboardMarkup(file_list)) # -----------------------------Bot All Available Files Command ----------------------------------------# @FileStream.on_message((filters.command('filebank') & filters.private) | (filters.regex("ꜰɪʟᴇ ʙᴀɴᴋ") & filters.private)) @verify_users async def my_filebank(bot: Client, message: Message, response): if await db.is_admin(message.from_user.id): user_files, total_files = await db.find_all_files([1, 10]) else: user_files, total_files = await db.find_all_public_files([1, 10]) file_list = [] async for x in user_files: file_list.append([ InlineKeyboardButton(f"📦 {x['file']['caption']}",callback_data=f"allfile_{x['_id']}_{1}")]) if total_files > 10: file_list.append([ InlineKeyboardButton("◄", callback_data="N/A"), InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",callback_data="N/A"), InlineKeyboardButton("►", callback_data="userallfiles_2") ], ) if not file_list: file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) await message.reply_photo(photo=Telegram.FILE_PIC, caption="Total files: {}".format(total_files), reply_markup=InlineKeyboardMarkup(file_list)) @FileStream.on_inline_query() async def handle_inline_query(client, query): results = [] if '|' in query.query: text, file_type = query.query.split('|', maxsplit=1) text = text.strip() file_type = file_type.strip().lower() else: text = query.query.strip() file_type = None offset = int(query.offset or 0) files, next_offset = await db.get_search_results(text,file_type=file_type,max_results=10,offset=offset) for file in files: results.append( InlineQueryResultCachedDocument( title=file['file']['file_name'], document_file_id=file['file']['file_id'], caption=file['file']['file_name'] or "", description= f" Size: {humanbytes(file['file']['file_size'])}\nType:{file['file']['mime_type']} ", reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton('Search', switch_inline_query_current_chat='') ]]))) if results: switch_pm_text = f"Results" if text: switch_pm_text += f" for {text}" await query.answer(results=results, cache_time=300, switch_pm_text=str(switch_pm_text), switch_pm_parameter="start", next_offset=str(next_offset) ) else: switch_pm_text = f'No results' if text: switch_pm_text += f' for "{text}"' await query.answer(results=[],cache_time=300,switch_pm_text=switch_pm_text,switch_pm_parameter="okay") #await inline_query.answer(results=result)