Spaces:
Running
Running
File size: 7,370 Bytes
e566133 cbf6107 e566133 c91f3ea e566133 c91f3ea e566133 c91f3ea e566133 4a17347 e566133 0b8a548 e566133 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import math
import asyncio
import logging
from pyrogram import filters, Client
from pyrogram.types import BotCommand
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ReplyKeyboardMarkup, KeyboardButton, Message, InlineQueryResultArticle, InputTextMessageContent, InlineQueryResultPhoto
from pyrogram.types import InlineQueryResultVideo, InlineQueryResultAudio, InlineQueryResultCachedDocument,WebAppInfo
#----------------------------Local Imports ---------------------#
from FileStream import __version__
from FileStream.bot import FileStream
from FileStream.config import Telegram
from FileStream.Database import Database
from FileStream.Exceptions import FileNotFound
from FileStream.utils.FileProcessors.human_readable import humanbytes
from FileStream.utils.FileProcessors.bot_utils import gen_linkx, verify_user, verify_users
from FileStream.utils.FileProcessors.translation import LANG, BUTTON
# Set new commands
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
#----------------------------search Commands ---------------------#
@FileStream.on_message((filters.command('search') & filters.private) | (filters.regex("ꜱᴇᴀʀᴄʜ") & filters.private))
@verify_users
async def search_files(bot: Client, message: Message, response):
await message.reply(text="ꜱᴇᴀʀᴄʜ ɪɴ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton(
'Search', switch_inline_query_current_chat='')
]]))
@FileStream.on_message(filters.command("webview"))
@verify_users
async def send_webview_button(bot: Client, message: Message, response):
# Define the inline keyboard with a button that opens a web page
webapp_button = InlineKeyboardMarkup(
[
[InlineKeyboardButton("Open WebApp", web_app=WebAppInfo(url="https://youtube.com"))]
]
)
await message.reply_text(
"Click the button below to open the WebApp:",
reply_markup=webapp_button
)
#-------------------------Files---------------------------------#
@FileStream.on_message((filters.command('files') & filters.private) | (filters.regex("ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ") & filters.private))
@verify_users
async def my_files(bot: Client, message: Message, response):
user_files, total_files = await db.find_files(message.from_user.id, [1, 10])
file_list = []
async for x in user_files:
file_list.append([
InlineKeyboardButton(f"📦 {x['file']['caption']}", callback_data=f"myfile_{x['_id']}_{1}")
])
if total_files > 10:
file_list.append([
InlineKeyboardButton("◄", callback_data="N/A"),
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",callback_data="N/A"),
InlineKeyboardButton("►", callback_data="userfiles_2")
], )
if not file_list:
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], )
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
await message.reply_photo(photo=Telegram.FILE_PIC,
caption="Total files: {}".format(total_files),
reply_markup=InlineKeyboardMarkup(file_list))
# -----------------------------Bot Private Files Command -----------------------------------------------#
@FileStream.on_message((filters.command('myfiles') & filters.private) | (filters.regex("ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ") & filters.private))
@verify_users
async def my_privfiles(bot: Client, message: Message, response):
user_files, total_files = await db.find_private_files(
message.from_user.id, [1, 10])
file_list = []
async for x in user_files:
file_list.append([
InlineKeyboardButton(f" 📦 {x['file']['caption']}",callback_data=f"myprivfile_{x['_id']}_{1}")
])
if total_files > 10:
file_list.append([
InlineKeyboardButton("◄", callback_data="N/A"),
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",
callback_data="N/A"),
InlineKeyboardButton("►", callback_data="userfiles_2")
], )
if not file_list:
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], )
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
await message.reply_photo(photo=Telegram.FILE_PIC,
caption="Total files: {}".format(total_files),
reply_markup=InlineKeyboardMarkup(file_list))
# -----------------------------Bot All Available Files Command ----------------------------------------#
@FileStream.on_message((filters.command('filebank') & filters.private) | (filters.regex("ꜰɪʟᴇ ʙᴀɴᴋ") & filters.private))
@verify_users
async def my_filebank(bot: Client, message: Message, response):
if await db.is_admin(message.from_user.id):
user_files, total_files = await db.find_all_files([1, 10])
else:
user_files, total_files = await db.find_all_public_files([1, 10])
file_list = []
async for x in user_files:
file_list.append([
InlineKeyboardButton(f"📦 {x['file']['caption']}",callback_data=f"allfile_{x['_id']}_{1}")])
if total_files > 10:
file_list.append([
InlineKeyboardButton("◄", callback_data="N/A"),
InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",callback_data="N/A"),
InlineKeyboardButton("►", callback_data="userallfiles_2")
], )
if not file_list:
file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], )
file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")])
await message.reply_photo(photo=Telegram.FILE_PIC,
caption="Total files: {}".format(total_files),
reply_markup=InlineKeyboardMarkup(file_list))
@FileStream.on_inline_query()
async def handle_inline_query(client, query):
results = []
if '|' in query.query:
text, file_type = query.query.split('|', maxsplit=1)
text = text.strip()
file_type = file_type.strip().lower()
else:
text = query.query.strip()
file_type = None
offset = int(query.offset or 0)
files, next_offset = await db.get_search_results(text,file_type=file_type,max_results=10,offset=offset)
for file in files:
results.append(
InlineQueryResultCachedDocument(
title=file['file']['file_name'],
document_file_id=file['file']['file_id'],
caption=file['file']['file_name'] or "",
description=f"Size: {humanbytes(file['file']['file_size'])}\nType:{file['file']['mime_type']} ",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton('Search',switch_inline_query_current_chat='')]])
)
)
if results:
switch_pm_text = f"Results"
if text:
switch_pm_text += f" for {text}"
await query.answer(results=results, cache_time=300, switch_pm_text=str(switch_pm_text), switch_pm_parameter="start", next_offset=str(next_offset) )
else:
switch_pm_text = f'No results'
if text:
switch_pm_text += f' for "{text}"'
await query.answer(results=[],cache_time=300,switch_pm_text=switch_pm_text,switch_pm_parameter="okay")
#await inline_query.answer(results=result)
|