Spaces:
Build error
Build error
File size: 7,208 Bytes
0cee3b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# ===================================================================== #
# Copyright (c) 2022 Itz-fork #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. #
# See the GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <http://www.gnu.org/licenses/> #
# ===================================================================== #
import logging
from time import time
from asyncio import sleep
from typing import Callable
from os import path, remove, stat
from config import Config
from pyrogram import Client
from .caching import STRINGS
from gofile2 import Async_Gofile
from pyrogram.errors import FloodWait
from pyrogram.types import CallbackQuery, Message
from unzipper.database.language import get_language
from unzipper.database.upload_mode import get_upload_mode
from unzipper.helpers_nexa.utils import (TimeFormatter, progress_for_pyrogram,
rm_mark_chars, run_shell_cmds)
class UnzipperBot(Client):
"""
Unzipper bot client
"""
version = "v1.0.2"
def __init__(self):
super().__init__("UnzipperBot",
api_id=Config.APP_ID,
api_hash=Config.API_HASH,
bot_token=Config.BOT_TOKEN,
plugins=dict(root="unzipper/modules"),
sleep_threshold=10)
######## Decorators ########
def handle_erros(self, func: Callable) -> Callable:
"""
Handle erros and database updates of users
"""
async def decorator(client: Client, message: Message):
lang = await get_language(message.chat.id)
try:
await self.check_user(message)
return await func(client, message, STRINGS[lang])
except Exception as e:
logging.warn(e)
await self.send_message(message.chat.id, STRINGS[lang]["failed_main"].format(e))
return decorator
def handle_query(self, func: Callable) -> Callable:
"""
Handle callback queries
"""
async def decorator(client: Client, query: CallbackQuery):
lang = await get_language(query.message.chat.id)
try:
await func(client, query, STRINGS[lang])
except Exception as e:
logging.warn(e)
await self.send_message(query.message.chat.id, STRINGS[lang]["failed_main"].format(e))
return decorator
######## File utils ########
async def send_file(self, c_id: int, doc_f: str, query: CallbackQuery, lang: str = "en", del_status: bool = False):
"""
Send a file to the user
Parameters:
- `c_id` - Chat id
- `doc_f` - File path
- `query` - CallBackQuery object
- `del_status` - Whether if you want to delete progress message or not
"""
try:
# This is my kingdom...
cum = await get_upload_mode(c_id)
# Checks if url file size is bigger than 2GB (Telegram limit)
file_size = stat(doc_f).st_size
if file_size > Config.TG_MAX_SIZE:
# Uploads the file to gofile.io
upmsg = await self.send_message(c_id, text=STRINGS[lang]["alert_file_too_large"])
try:
ga = Async_Gofile()
gfio = await ga.upload(doc_f)
from unzipper import Buttons
await upmsg.edit("**Your file has been uploaded to gofile! Click on the below button to download it π**", reply_markup=await Buttons.make_button("Gofile link π", url=gfio["downloadPage"]))
except:
await upmsg.edit("`Upload failed, Better luck next time π!`")
remove(doc_f)
return
tgupmsg = await self.send_message(c_id, STRINGS[lang]["processing"])
stm = time()
# Uplaod type: Video
if cum == "video":
sthumb = await self.get_or_gen_thumb(c_id, doc_f, True)
vid_duration = await run_shell_cmds(f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {doc_f}")
await self.send_video(
chat_id=c_id,
video=doc_f,
caption="**Extracted by @NexaUnzipper_Bot**",
duration=int(
vid_duration) if vid_duration.isnumeric() else 0,
thumb=sthumb,
progress=progress_for_pyrogram,
progress_args=("**Trying to upload π** \n", tgupmsg, stm))
# Upload type: Document
else:
sthumb = await self.get_or_gen_thumb(c_id, doc_f)
await self.send_document(
chat_id=c_id,
document=doc_f,
caption="**Extracted by @NexaUnzipper_Bot**",
thumb=sthumb,
progress=progress_for_pyrogram,
progress_args=("**Trying to upload π** \n", tgupmsg, stm))
etm = time()
# Delete or edit the progress message
await tgupmsg.delete() if del_status else await tgupmsg.edit(STRINGS[lang]["ok_upload"].format(path.basename(doc_f), TimeFormatter(round(etm - stm))))
# Cleanup
try:
remove(doc_f)
if sthumb:
remove(sthumb)
except:
pass
except FloodWait as f:
await sleep(f.x)
return await self.send_file(c_id, doc_f, query)
except FileNotFoundError:
return await self.answer_query(query, "Sorry! I can't find that file", True)
except BaseException as e:
logging.warn(e)
await self.answer_query(query, f"**Error:** \n`{e}`")
######## Utils ########
async def answer_query(self, query: CallbackQuery, text: str, alert: bool = False, *args, **kwargs):
"""
Answer CallbackQuery with better error handling
Parameters:
- `query` - CallBackQuery object
- `text` - Message text
- `alert` - Pass True if you want to show the message as an alert
"""
try:
if alert:
await query.answer(await rm_mark_chars(text), show_alert=True, *args, **kwargs)
else:
await query.message.edit(text, *args, **kwargs)
except:
try:
await query.message.delete()
except:
pass
await self.send_message(query.message.chat.id, text, *args, **kwargs)
|