BinaryONe
DB Changes
c91f3ea
import math
import asyncio
import logging
from pyrogram import filters, Client
from pyrogram.types import BotCommand
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ReplyKeyboardMarkup, KeyboardButton, Message, InlineQueryResultArticle, InputTextMessageContent, InlineQueryResultPhoto
from pyrogram.types import InlineQueryResultVideo, InlineQueryResultAudio, InlineQueryResultCachedDocument,WebAppInfo
#----------------------------Local Imports ---------------------#
from FileStream import __version__
from FileStream.bot import FileStream
from FileStream.config import Telegram
from FileStream.Database import Database
from FileStream.Exceptions import FileNotFound
from FileStream.utils.FileProcessors.human_readable import humanbytes
from FileStream.utils.FileProcessors.bot_utils import gen_linkx, verify_user, verify_users
from FileStream.utils.FileProcessors.translation import LANG, BUTTON
# Set new commands
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
#----------------------------search Commands ---------------------#
@FileStream.on_message((filters.command('search') & filters.private) | (filters.regex("ꜱᴇᴀʀᴄʜ") & filters.private))
@verify_users
async def search_files(bot: Client, message: Message, response):
await message.reply(text="ꜱᴇᴀʀᴄʜ ɪɴ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton(
'Search', switch_inline_query_current_chat='')
]]))
@FileStream.on_message(filters.command("webview"))
@verify_users
async def send_webview_button(bot: Client, message: Message, response):
# Define the inline keyboard with a button that opens a web page
webapp_button = InlineKeyboardMarkup(
[
[InlineKeyboardButton("Open WebApp", web_app=WebAppInfo(url="https://youtube.com"))]
]
)
await message.reply_text(
"Click the button below to open the WebApp:",
reply_markup=webapp_button
)
#-------------------------Files---------------------------------#
@FileStream.on_message((filters.command('files') & filters.private) | (filters.regex("ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ") & filters.private))
@verify_users
async def my_files(bot: Client, message: Message, response):
user_files, total_files = await db.find_files(message.from_user.id, [1, 10])
file_list = []
async for x in user_files:
file_list.append([
InlineKeyboardButton(f"📦 {x['file']['caption']}", callback_data=f"myfile_{x['_id']}_{1}")
])
if total_files > 10:
file_list.append([
InlineKeyboardButton("◄", callback_data="N/A"),
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",callback_data="N/A"),
InlineKeyboardButton("►", callback_data="userfiles_2")
], )
if not file_list:
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], )
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
await message.reply_photo(photo=Telegram.FILE_PIC,
caption="Total files: {}".format(total_files),
reply_markup=InlineKeyboardMarkup(file_list))
# -----------------------------Bot Private Files Command -----------------------------------------------#
@FileStream.on_message((filters.command('myfiles') & filters.private) | (filters.regex("ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ") & filters.private))
@verify_users
async def my_privfiles(bot: Client, message: Message, response):
user_files, total_files = await db.find_private_files(
message.from_user.id, [1, 10])
file_list = []
async for x in user_files:
file_list.append([
InlineKeyboardButton(f" 📦 {x['file']['caption']}",callback_data=f"myprivfile_{x['_id']}_{1}")
])
if total_files > 10:
file_list.append([
InlineKeyboardButton("◄", callback_data="N/A"),
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",
callback_data="N/A"),
InlineKeyboardButton("►", callback_data="userfiles_2")
], )
if not file_list:
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], )
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
await message.reply_photo(photo=Telegram.FILE_PIC,
caption="Total files: {}".format(total_files),
reply_markup=InlineKeyboardMarkup(file_list))
# -----------------------------Bot All Available Files Command ----------------------------------------#
@FileStream.on_message((filters.command('filebank') & filters.private) | (filters.regex("ꜰɪʟᴇ ʙᴀɴᴋ") & filters.private))
@verify_users
async def my_filebank(bot: Client, message: Message, response):
if await db.is_admin(message.from_user.id):
user_files, total_files = await db.find_all_files([1, 10])
else:
user_files, total_files = await db.find_all_public_files([1, 10])
file_list = []
async for x in user_files:
file_list.append([
InlineKeyboardButton(f"📦 {x['file']['caption']}",callback_data=f"allfile_{x['_id']}_{1}")])
if total_files > 10:
file_list.append([
InlineKeyboardButton("◄", callback_data="N/A"),
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",callback_data="N/A"),
InlineKeyboardButton("►", callback_data="userallfiles_2")
], )
if not file_list:
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], )
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
await message.reply_photo(photo=Telegram.FILE_PIC,
caption="Total files: {}".format(total_files),
reply_markup=InlineKeyboardMarkup(file_list))
@FileStream.on_inline_query()
async def handle_inline_query(client, query):
results = []
if '|' in query.query:
text, file_type = query.query.split('|', maxsplit=1)
text = text.strip()
file_type = file_type.strip().lower()
else:
text = query.query.strip()
file_type = None
offset = int(query.offset or 0)
files, next_offset = await db.get_search_results(text,file_type=file_type,max_results=10,offset=offset)
for file in files:
results.append(
InlineQueryResultCachedDocument(
title=file['file']['file_name'],
document_file_id=file['file']['file_id'],
caption=file['file']['file_name'] or "",
description=f"Size: {humanbytes(file['file']['file_size'])}\nType:{file['file']['mime_type']} ",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton('Search',switch_inline_query_current_chat='')]])
)
)
if results:
switch_pm_text = f"Results"
if text:
switch_pm_text += f" for {text}"
await query.answer(results=results, cache_time=300, switch_pm_text=str(switch_pm_text), switch_pm_parameter="start", next_offset=str(next_offset) )
else:
switch_pm_text = f'No results'
if text:
switch_pm_text += f' for "{text}"'
await query.answer(results=[],cache_time=300,switch_pm_text=switch_pm_text,switch_pm_parameter="okay")
#await inline_query.answer(results=result)