|
import math |
|
import asyncio |
|
import logging |
|
from pyrogram import filters, Client |
|
from pyrogram.types import BotCommand |
|
from pyrogram.enums.parse_mode import ParseMode |
|
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ReplyKeyboardMarkup, KeyboardButton, Message, InlineQueryResultArticle, InputTextMessageContent, InlineQueryResultPhoto |
|
from pyrogram.types import InlineQueryResultVideo, InlineQueryResultAudio, InlineQueryResultCachedDocument,WebAppInfo |
|
|
|
from FileStream import __version__ |
|
from FileStream.bot import FileStream |
|
from FileStream.config import Telegram |
|
from FileStream.Database import Database |
|
from FileStream.Exceptions import FileNotFound |
|
from FileStream.utils.FileProcessors.human_readable import humanbytes |
|
from FileStream.utils.FileProcessors.bot_utils import gen_linkx, verify_user, verify_users |
|
from FileStream.utils.FileProcessors.translation import LANG, BUTTON |
|
|
|
|
|
|
|
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) |
|
|
|
|
|
|
|
@FileStream.on_message((filters.command('search') & filters.private) | (filters.regex("ꜱᴇᴀʀᴄʜ") & filters.private)) |
|
@verify_users |
|
async def search_files(bot: Client, message: Message, response): |
|
|
|
await message.reply(text="ꜱᴇᴀʀᴄʜ ɪɴ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ", |
|
reply_markup=InlineKeyboardMarkup([[ |
|
InlineKeyboardButton( |
|
'Search', switch_inline_query_current_chat='') |
|
]])) |
|
|
|
@FileStream.on_message(filters.command("webview")) |
|
@verify_users |
|
async def send_webview_button(bot: Client, message: Message, response): |
|
|
|
webapp_button = InlineKeyboardMarkup( |
|
[ |
|
[InlineKeyboardButton("Open WebApp", web_app=WebAppInfo(url="https://youtube.com"))] |
|
] |
|
) |
|
|
|
await message.reply_text( |
|
"Click the button below to open the WebApp:", |
|
reply_markup=webapp_button |
|
|
|
) |
|
|
|
@FileStream.on_message((filters.command('files') & filters.private) | (filters.regex("ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ") & filters.private)) |
|
@verify_users |
|
async def my_files(bot: Client, message: Message, response): |
|
|
|
user_files, total_files = await db.find_files(message.from_user.id, [1, 10]) |
|
|
|
file_list = [] |
|
async for x in user_files: |
|
file_list.append([ |
|
InlineKeyboardButton(f"📦 {x['file']['caption']}", callback_data=f"myfile_{x['_id']}_{1}") |
|
]) |
|
if total_files > 10: |
|
file_list.append([ |
|
InlineKeyboardButton("◄", callback_data="N/A"), |
|
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",callback_data="N/A"), |
|
InlineKeyboardButton("►", callback_data="userfiles_2") |
|
], ) |
|
if not file_list: |
|
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) |
|
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) |
|
await message.reply_photo(photo=Telegram.FILE_PIC, |
|
caption="Total files: {}".format(total_files), |
|
reply_markup=InlineKeyboardMarkup(file_list)) |
|
|
|
|
|
|
|
@FileStream.on_message((filters.command('myfiles') & filters.private) | (filters.regex("ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ") & filters.private)) |
|
@verify_users |
|
async def my_privfiles(bot: Client, message: Message, response): |
|
user_files, total_files = await db.find_private_files( |
|
message.from_user.id, [1, 10]) |
|
|
|
file_list = [] |
|
async for x in user_files: |
|
file_list.append([ |
|
InlineKeyboardButton(f" 📦 {x['file']['caption']}",callback_data=f"myprivfile_{x['_id']}_{1}") |
|
]) |
|
if total_files > 10: |
|
file_list.append([ |
|
InlineKeyboardButton("◄", callback_data="N/A"), |
|
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}", |
|
callback_data="N/A"), |
|
InlineKeyboardButton("►", callback_data="userfiles_2") |
|
], ) |
|
if not file_list: |
|
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) |
|
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) |
|
await message.reply_photo(photo=Telegram.FILE_PIC, |
|
caption="Total files: {}".format(total_files), |
|
reply_markup=InlineKeyboardMarkup(file_list)) |
|
|
|
|
|
|
|
@FileStream.on_message((filters.command('filebank') & filters.private) | (filters.regex("ꜰɪʟᴇ ʙᴀɴᴋ") & filters.private)) |
|
@verify_users |
|
async def my_filebank(bot: Client, message: Message, response): |
|
|
|
if await db.is_admin(message.from_user.id): |
|
user_files, total_files = await db.find_all_files([1, 10]) |
|
else: |
|
user_files, total_files = await db.find_all_public_files([1, 10]) |
|
|
|
file_list = [] |
|
async for x in user_files: |
|
file_list.append([ |
|
InlineKeyboardButton(f"📦 {x['file']['caption']}",callback_data=f"allfile_{x['_id']}_{1}")]) |
|
if total_files > 10: |
|
file_list.append([ |
|
InlineKeyboardButton("◄", callback_data="N/A"), |
|
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",callback_data="N/A"), |
|
InlineKeyboardButton("►", callback_data="userallfiles_2") |
|
], ) |
|
if not file_list: |
|
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) |
|
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) |
|
|
|
await message.reply_photo(photo=Telegram.FILE_PIC, |
|
caption="Total files: {}".format(total_files), |
|
reply_markup=InlineKeyboardMarkup(file_list)) |
|
|
|
|
|
@FileStream.on_inline_query() |
|
async def handle_inline_query(client, query): |
|
results = [] |
|
if '|' in query.query: |
|
text, file_type = query.query.split('|', maxsplit=1) |
|
text = text.strip() |
|
file_type = file_type.strip().lower() |
|
else: |
|
text = query.query.strip() |
|
file_type = None |
|
offset = int(query.offset or 0) |
|
files, next_offset = await db.get_search_results(text,file_type=file_type,max_results=10,offset=offset) |
|
for file in files: |
|
results.append( |
|
InlineQueryResultCachedDocument( |
|
title=file['file']['file_name'], |
|
document_file_id=file['file']['file_id'], |
|
caption=file['file']['file_name'] or "", |
|
description=f"Size: {humanbytes(file['file']['file_size'])}\nType:{file['file']['mime_type']} ", |
|
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton('Search',switch_inline_query_current_chat='')]]) |
|
) |
|
) |
|
if results: |
|
switch_pm_text = f"Results" |
|
if text: |
|
switch_pm_text += f" for {text}" |
|
|
|
await query.answer(results=results, cache_time=300, switch_pm_text=str(switch_pm_text), switch_pm_parameter="start", next_offset=str(next_offset) ) |
|
else: |
|
switch_pm_text = f'No results' |
|
if text: |
|
switch_pm_text += f' for "{text}"' |
|
|
|
await query.answer(results=[],cache_time=300,switch_pm_text=switch_pm_text,switch_pm_parameter="okay") |
|
|
|
|