Spaces:
Running
Running
import math | |
import asyncio | |
import logging | |
from pyrogram import filters, Client | |
from pyrogram.types import BotCommand | |
from pyrogram.enums.parse_mode import ParseMode | |
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ReplyKeyboardMarkup, KeyboardButton, Message, InlineQueryResultArticle, InputTextMessageContent, InlineQueryResultPhoto | |
from pyrogram.types import InlineQueryResultVideo, InlineQueryResultAudio, InlineQueryResultCachedDocument,WebAppInfo | |
#----------------------------Local Imports ---------------------# | |
from FileStream import __version__ | |
from FileStream.bot import FileStream | |
from FileStream.config import Telegram | |
from FileStream.Database import Database | |
from FileStream.Exceptions import FileNotFound | |
from FileStream.utils.FileProcessors.human_readable import humanbytes | |
from FileStream.utils.FileProcessors.bot_utils import gen_linkx, verify_user, verify_users | |
from FileStream.utils.FileProcessors.translation import LANG, BUTTON | |
# Set new commands | |
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) | |
#----------------------------search Commands ---------------------# | |
async def search_files(bot: Client, message: Message, response): | |
await message.reply(text="ꜱᴇᴀʀᴄʜ ɪɴ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ", | |
reply_markup=InlineKeyboardMarkup([[ | |
InlineKeyboardButton( | |
'Search', switch_inline_query_current_chat='') | |
]])) | |
async def send_webview_button(bot: Client, message: Message, response): | |
# Define the inline keyboard with a button that opens a web page | |
webapp_button = InlineKeyboardMarkup( | |
[ | |
[InlineKeyboardButton("Open WebApp", web_app=WebAppInfo(url="https://youtube.com"))] | |
] | |
) | |
await message.reply_text( | |
"Click the button below to open the WebApp:", | |
reply_markup=webapp_button | |
) | |
#-------------------------Files---------------------------------# | |
async def my_files(bot: Client, message: Message, response): | |
user_files, total_files = await db.find_files(message.from_user.id, [1, 10]) | |
file_list = [] | |
async for x in user_files: | |
file_list.append([ | |
InlineKeyboardButton(f"📦 {x['file']['caption']}", callback_data=f"myfile_{x['_id']}_{1}") | |
]) | |
if total_files > 10: | |
file_list.append([ | |
InlineKeyboardButton("◄", callback_data="N/A"), | |
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",callback_data="N/A"), | |
InlineKeyboardButton("►", callback_data="userfiles_2") | |
], ) | |
if not file_list: | |
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) | |
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) | |
await message.reply_photo(photo=Telegram.FILE_PIC, | |
caption="Total files: {}".format(total_files), | |
reply_markup=InlineKeyboardMarkup(file_list)) | |
# -----------------------------Bot Private Files Command -----------------------------------------------# | |
async def my_privfiles(bot: Client, message: Message, response): | |
user_files, total_files = await db.find_private_files( | |
message.from_user.id, [1, 10]) | |
file_list = [] | |
async for x in user_files: | |
file_list.append([ | |
InlineKeyboardButton(f" 📦 {x['file']['caption']}",callback_data=f"myprivfile_{x['_id']}_{1}") | |
]) | |
if total_files > 10: | |
file_list.append([ | |
InlineKeyboardButton("◄", callback_data="N/A"), | |
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}", | |
callback_data="N/A"), | |
InlineKeyboardButton("►", callback_data="userfiles_2") | |
], ) | |
if not file_list: | |
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) | |
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) | |
await message.reply_photo(photo=Telegram.FILE_PIC, | |
caption="Total files: {}".format(total_files), | |
reply_markup=InlineKeyboardMarkup(file_list)) | |
# -----------------------------Bot All Available Files Command ----------------------------------------# | |
async def my_filebank(bot: Client, message: Message, response): | |
if await db.is_admin(message.from_user.id): | |
user_files, total_files = await db.find_all_files([1, 10]) | |
else: | |
user_files, total_files = await db.find_all_public_files([1, 10]) | |
file_list = [] | |
async for x in user_files: | |
file_list.append([ | |
InlineKeyboardButton(f"📦 {x['file']['caption']}",callback_data=f"allfile_{x['_id']}_{1}")]) | |
if total_files > 10: | |
file_list.append([ | |
InlineKeyboardButton("◄", callback_data="N/A"), | |
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",callback_data="N/A"), | |
InlineKeyboardButton("►", callback_data="userallfiles_2") | |
], ) | |
if not file_list: | |
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) | |
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) | |
await message.reply_photo(photo=Telegram.FILE_PIC, | |
caption="Total files: {}".format(total_files), | |
reply_markup=InlineKeyboardMarkup(file_list)) | |
async def handle_inline_query(client, query): | |
results = [] | |
if '|' in query.query: | |
text, file_type = query.query.split('|', maxsplit=1) | |
text = text.strip() | |
file_type = file_type.strip().lower() | |
else: | |
text = query.query.strip() | |
file_type = None | |
offset = int(query.offset or 0) | |
files, next_offset = await db.get_search_results(text,file_type=file_type,max_results=10,offset=offset) | |
for file in files: | |
results.append( | |
InlineQueryResultCachedDocument( | |
title=file['file']['file_name'], | |
document_file_id=file['file']['file_id'], | |
caption=file['file']['file_name'] or "", | |
description=f"Size: {humanbytes(file['file']['file_size'])}\nType:{file['file']['mime_type']} ", | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton('Search',switch_inline_query_current_chat='')]]) | |
) | |
) | |
if results: | |
switch_pm_text = f"Results" | |
if text: | |
switch_pm_text += f" for {text}" | |
await query.answer(results=results, cache_time=300, switch_pm_text=str(switch_pm_text), switch_pm_parameter="start", next_offset=str(next_offset) ) | |
else: | |
switch_pm_text = f'No results' | |
if text: | |
switch_pm_text += f' for "{text}"' | |
await query.answer(results=[],cache_time=300,switch_pm_text=switch_pm_text,switch_pm_parameter="okay") | |
#await inline_query.answer(results=result) | |