import urllib3 import asyncio import traceback from hashlib import md5 from typing import Dict, Union, BinaryIO, Callable #------------------------------------------------------ from pyrogram.session import Session from pyrogram.errors import FloodWait from pyrogram.enums.parse_mode import ParseMode from pyrogram import filters, types, Client, raw from pyrogram.file_id import FileId, FileType, PHOTO_TYPES, ThumbnailSource from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery #---------------------------------------------- from FileStream.bot import FileStream, MULTI_CLIENTS, WORK_LOADS from FileStream.config import Telegram, Server from FileStream.Database import Database from FileStream.utils.FileProcessors.translation import LANG, BUTTON from FileStream.utils.FileProcessors.bot_utils import gen_link, priv_func, gen_priv_file_link from FileStream.utils.FileProcessors.file_properties import get_file_ids, get_file_info db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) Hash = {} class TeleUploader: def __init__(self, client: Client): self.clean_timer = 30 * 60 self.client: Client = client self.cached_file_ids: Dict[str, FileId] = {} asyncio.create_task(self.clean_cache()) #print("\n* Client :", client, "\n") async def clean_cache(self) -> None: """ function to clean the cache to reduce memory usage """ while True: await asyncio.sleep(self.clean_timer) print("** Caches Cleared :", self.cached_file_ids) self.cached_file_ids.clear() print("Cleaned the cache") logging.debug("Cleaned the cache") async def get_me(self): return await self.client.get_me().username async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId: """ Returns the properties of a media of a specific message in a FIleId class. if the properties are cached, then it'll return the cached results. or it'll generate the properties from the Message ID and cache them. """ if not db_id in self.cached_file_ids: logging.debug("Before Calling generate_file_properties") await self.generate_file_properties(db_id, MULTI_CLIENTS) logging.debug(f"Cached file properties for file with ID {db_id}") return self.cached_file_ids[db_id] async def generate_media_session(self, client: Client,file_id: FileId) -> Session: """ Generates the media session for the DC that contains the media file. This is required for getting the bytes from Telegram servers. """ media_session = client.media_sessions.get(file_id.dc_id, None) if media_session is None: if file_id.dc_id != await client.storage.dc_id(): media_session = Session( client, file_id.dc_id, await Auth(client, file_id.dc_id, await client.storage.test_mode()).create(), await client.storage.test_mode(), is_media=True, ) await media_session.start() for _ in range(6): exported_auth = await client.invoke( raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id)) try: await media_session.invoke( raw.functions.auth.ImportAuthorization(id=exported_auth.id, bytes=exported_auth.bytes)) break except AuthBytesInvalid: logging.debug(f"Invalid authorization bytes for DC {file_id.dc_id}") continue else: await media_session.stop() raise AuthBytesInvalid else: media_session = Session( client, file_id.dc_id, await client.storage.auth_key(), await client.storage.test_mode(), is_media=True, ) await media_session.start() logging.debug(f"Created media session for DC {file_id.dc_id}") client.media_sessions[file_id.dc_id] = media_session else: logging.debug(f"Using cached media session for DC {file_id.dc_id}") return media_session def mime(self, filename): import mimetypes mime_type, encoding = mimetypes.guess_type(filename) return mime_type async def gen_session(self): client = self.client session = Session(client, await client.storage.dc_id(), await client.storage.auth_key(), await client.storage.test_mode(), is_media=True) return session async def upload_web_file(self, file_details, chunk): client = self.client """ These Name Are not Used Insted of that directly dictionary Name Used file_name = file_details["file_name"] file_size = file_details["file_size"] file_part = file_details["file_part"] total_parts = file_details["total_parts"] upload_id = file_details["upload_id"] """ file_details['file']["file_id"] = client.rnd_id() #mime_type = file_details['file']["mime_type"] response = dict(status="ok", message="ok") if file_details["file"]["file_size"] == 0: raise ValueError("File size equals to 0 B") file_size_limit_mib = 4000 if client.me.is_premium else 2000 #file_size_limit_mib = 4000 if file_details["file"]["file_size"] > file_size_limit_mib * 1024 * 1024: raise ValueError(f"Can't upload files bigger than {file_size_limit_mib} MiB") #file_total_parts = file_details["total_parts"] #is_big = file_size > 10 * 1024 * 1024 is_big = file_details["file"]["file_size"] > 1 session = await self.gen_session() await session.start() file = await db.add_webfile(file_details) #new_file_id = file["new_file_id"] if not is_big or file_details["file"]["file_part"] == 0: await client.send_message( chat_id=Telegram.ULOG_GROUP, text=f"Hi, I am just Started, Do Not Disturb Please", disable_web_page_preview=True) Hash['md5_sum'] = md5() try: if is_big: rpc = raw.functions.upload.SaveBigFilePart( file_id=file['file']["file_id"], file_part=file_details["file"]["file_part"], file_total_parts=file_details["file"]["total_parts"], bytes=chunk) response['status'] = "success" response['message'] = f"Uploading as Bigfile {file_details['file']['file_part']}/{file_details['file']['total_parts']}" print("Response", response) await session.invoke(rpc) except (TimeoutError, AttributeError): pass if file_details['file'][ "file_part"] == file_details['file']["total_parts"] - 1: print("Final Function") if is_big: final = raw.types.InputFileBig( id=file['file']["file_id"], parts=file_details['file']["total_parts"], name=file_details['file']["file_name"], ) media = raw.types.InputMediaUploadedDocument( file=final, mime_type=file_details['file']["mime_type"], attributes=[raw.types.DocumentAttributeFilename(file_name=file_details['file']["file_name"])]) try: msgs = await client.invoke( raw.functions.messages.SendMedia( peer=await client.resolve_peer(Telegram.FLOG_CHANNEL), media=media, message=file_details['file']["file_name"], random_id=file['file']["file_id"])) #print(msgs) message = await FileStream.send_message(Telegram.ULOG_GROUP, "Message sent with **Pyrogram**!") message_id = getattr(getattr(getattr(msgs, "updates", "")[1], "message", ""), "id", "") print("Printing msg-id", message_id) chat_id = Telegram.FLOG_CHANNEL print("Printing ", message_id, chat_id) MessageFile = await FileStream.get_messages(chat_id, message_id) #print("Printing MessageFile", MessageFile) instruction = { "privacy_type": "PRIVATE", "user_id": file_details["user_id"], "user_type": "WEB" } file_info = get_file_info(MessageFile, instruction) print("Printing file_info", file_info) await db.uploaded_web_file(file_details["dropzone_id"]) # Here we are Adding the File Into the database First #await db.add_webfile(file_info) inserted_id = await db.add_file(file_info=file_info,db_type="TEMPORARY") await get_file_ids(False, inserted_id, MessageFile) reply_markup,stream_text = await gen_link(_id=inserted_id) await message.edit_text( text=stream_text, parse_mode=ParseMode.HTML, disable_web_page_preview=True, reply_markup=reply_markup, ) #log_msg = await send_file(FileStream, db_id, file_info['file_id'], message) #await update_file_id(log_msg.id, MULTI_CLIENTS)) except Exception as e: await client.send_message(chat_id=Telegram.ULOG_GROUP, text=f"**#EʀʀᴏʀTʀᴀᴄᴋᴇʙᴀᴄᴋ:** `{e}`", disable_web_page_preview=True) print(f"Cᴀɴ'ᴛ Eᴅɪᴛ Bʀᴏᴀᴅᴄᴀsᴛ Mᴇssᴀɢᴇ!\nEʀʀᴏʀ: **Gɪᴠᴇ ᴍᴇ ᴇᴅɪᴛ ᴘᴇʀᴍɪssɪᴏɴ ɪɴ ᴜᴘᴅᴀᴛᴇs ᴀɴᴅ ʙɪɴ Cʜᴀɴɴᴇʟ!{traceback.format_exc()}**") await session.stop() return response