Spaces:
Running
Running
import urllib3 | |
import asyncio | |
import traceback | |
from hashlib import md5 | |
from typing import Dict, Union, BinaryIO, Callable | |
#------------------------------------------------------ | |
from pyrogram.session import Session | |
from pyrogram.errors import FloodWait | |
from pyrogram.enums.parse_mode import ParseMode | |
from pyrogram import filters, types, Client, raw | |
from pyrogram.file_id import FileId, FileType, PHOTO_TYPES, ThumbnailSource | |
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery | |
#---------------------------------------------- | |
from FileStream.bot import FileStream, MULTI_CLIENTS, WORK_LOADS | |
from FileStream.config import Telegram, Server | |
from FileStream.Database import Database | |
from FileStream.utils.FileProcessors.translation import LANG, BUTTON | |
from FileStream.utils.FileProcessors.bot_utils import gen_link, priv_func, gen_priv_file_link | |
from FileStream.utils.FileProcessors.file_properties import get_file_ids, get_file_info | |
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) | |
Hash = {} | |
class TeleUploader: | |
def __init__(self, client: Client): | |
self.clean_timer = 30 * 60 | |
self.client: Client = client | |
self.cached_file_ids: Dict[str, FileId] = {} | |
asyncio.create_task(self.clean_cache()) | |
#print("\n* Client :", client, "\n") | |
async def clean_cache(self) -> None: | |
""" | |
function to clean the cache to reduce memory usage | |
""" | |
while True: | |
await asyncio.sleep(self.clean_timer) | |
print("** Caches Cleared :", self.cached_file_ids) | |
self.cached_file_ids.clear() | |
print("Cleaned the cache") | |
logging.debug("Cleaned the cache") | |
async def get_me(self): | |
return await self.client.get_me().username | |
async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId: | |
""" | |
Returns the properties of a media of a specific message in a FIleId class. | |
if the properties are cached, then it'll return the cached results. | |
or it'll generate the properties from the Message ID and cache them. | |
""" | |
if not db_id in self.cached_file_ids: | |
logging.debug("Before Calling generate_file_properties") | |
await self.generate_file_properties(db_id, MULTI_CLIENTS) | |
logging.debug(f"Cached file properties for file with ID {db_id}") | |
return self.cached_file_ids[db_id] | |
async def generate_media_session(self, client: Client,file_id: FileId) -> Session: | |
""" | |
Generates the media session for the DC that contains the media file. | |
This is required for getting the bytes from Telegram servers. | |
""" | |
media_session = client.media_sessions.get(file_id.dc_id, None) | |
if media_session is None: | |
if file_id.dc_id != await client.storage.dc_id(): | |
media_session = Session( | |
client, | |
file_id.dc_id, | |
await Auth(client, file_id.dc_id, await client.storage.test_mode()).create(), | |
await client.storage.test_mode(), | |
is_media=True, | |
) | |
await media_session.start() | |
for _ in range(6): | |
exported_auth = await client.invoke( | |
raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id)) | |
try: | |
await media_session.invoke( | |
raw.functions.auth.ImportAuthorization(id=exported_auth.id, bytes=exported_auth.bytes)) | |
break | |
except AuthBytesInvalid: | |
logging.debug(f"Invalid authorization bytes for DC {file_id.dc_id}") | |
continue | |
else: | |
await media_session.stop() | |
raise AuthBytesInvalid | |
else: | |
media_session = Session( | |
client, | |
file_id.dc_id, | |
await client.storage.auth_key(), | |
await client.storage.test_mode(), | |
is_media=True, | |
) | |
await media_session.start() | |
logging.debug(f"Created media session for DC {file_id.dc_id}") | |
client.media_sessions[file_id.dc_id] = media_session | |
else: | |
logging.debug(f"Using cached media session for DC {file_id.dc_id}") | |
return media_session | |
def mime(self, filename): | |
import mimetypes | |
mime_type, encoding = mimetypes.guess_type(filename) | |
return mime_type | |
async def gen_session(self): | |
client = self.client | |
session = Session(client, | |
await client.storage.dc_id(), | |
await client.storage.auth_key(), | |
await client.storage.test_mode(), | |
is_media=True) | |
return session | |
async def upload_web_file(self, file_details, chunk): | |
client = self.client | |
""" | |
These Name Are not Used Insted of that directly dictionary Name Used | |
file_name = file_details["file_name"] | |
file_size = file_details["file_size"] | |
file_part = file_details["file_part"] | |
total_parts = file_details["total_parts"] | |
upload_id = file_details["upload_id"] | |
""" | |
file_details['file']["file_id"] = client.rnd_id() | |
#mime_type = file_details['file']["mime_type"] | |
response = dict(status="ok", message="ok") | |
if file_details["file"]["file_size"] == 0: | |
raise ValueError("File size equals to 0 B") | |
file_size_limit_mib = 4000 if client.me.is_premium else 2000 | |
#file_size_limit_mib = 4000 | |
if file_details["file"]["file_size"] > file_size_limit_mib * 1024 * 1024: | |
raise ValueError(f"Can't upload files bigger than {file_size_limit_mib} MiB") | |
#file_total_parts = file_details["total_parts"] | |
#is_big = file_size > 10 * 1024 * 1024 | |
is_big = file_details["file"]["file_size"] > 1 | |
session = await self.gen_session() | |
await session.start() | |
file = await db.add_webfile(file_details) | |
#new_file_id = file["new_file_id"] | |
if not is_big or file_details["file"]["file_part"] == 0: | |
await client.send_message( | |
chat_id=Telegram.ULOG_GROUP, | |
text=f"Hi, I am just Started, Do Not Disturb Please", | |
disable_web_page_preview=True) | |
Hash['md5_sum'] = md5() | |
try: | |
if is_big: | |
rpc = raw.functions.upload.SaveBigFilePart( | |
file_id=file['file']["file_id"], | |
file_part=file_details["file"]["file_part"], | |
file_total_parts=file_details["file"]["total_parts"], | |
bytes=chunk) | |
response['status'] = "success" | |
response['message'] = f"Uploading as Bigfile {file_details['file']['file_part']}/{file_details['file']['total_parts']}" | |
print("Response", response) | |
await session.invoke(rpc) | |
except (TimeoutError, AttributeError): | |
pass | |
if file_details['file'][ | |
"file_part"] == file_details['file']["total_parts"] - 1: | |
print("Final Function") | |
if is_big: | |
final = raw.types.InputFileBig( | |
id=file['file']["file_id"], | |
parts=file_details['file']["total_parts"], | |
name=file_details['file']["file_name"], | |
) | |
media = raw.types.InputMediaUploadedDocument( | |
file=final, | |
mime_type=file_details['file']["mime_type"], | |
attributes=[raw.types.DocumentAttributeFilename(file_name=file_details['file']["file_name"])]) | |
try: | |
msgs = await client.invoke( | |
raw.functions.messages.SendMedia( | |
peer=await client.resolve_peer(Telegram.FLOG_CHANNEL), | |
media=media, | |
message=file_details['file']["file_name"], | |
random_id=file['file']["file_id"])) | |
#print(msgs) | |
message = await FileStream.send_message(Telegram.ULOG_GROUP, "Message sent with **Pyrogram**!") | |
message_id = getattr(getattr(getattr(msgs, "updates", "")[1], "message", ""), "id", "") | |
print("Printing msg-id", message_id) | |
chat_id = Telegram.FLOG_CHANNEL | |
print("Printing ", message_id, chat_id) | |
MessageFile = await FileStream.get_messages(chat_id, message_id) | |
#print("Printing MessageFile", MessageFile) | |
instruction = { | |
"privacy_type": "PRIVATE", | |
"user_id": file_details["user_id"], | |
"user_type": "WEB" | |
} | |
file_info = get_file_info(MessageFile, instruction) | |
print("Printing file_info", file_info) | |
await db.uploaded_web_file(file_details["dropzone_id"]) | |
# Here we are Adding the File Into the database First | |
#await db.add_webfile(file_info) | |
inserted_id = await db.add_file(file_info=file_info,db_type="TEMPORARY") | |
await get_file_ids(False, inserted_id, MessageFile) | |
response = await gen_link(_id=inserted_id) | |
await message.edit_text( | |
text=response["stream_text"], | |
parse_mode=ParseMode.HTML, | |
disable_web_page_preview=True, | |
reply_markup=response["reply_markup"], | |
) | |
#log_msg = await send_file(FileStream, db_id, file_info['file_id'], message) | |
#await update_file_id(log_msg.id, MULTI_CLIENTS)) | |
except Exception as e: | |
await client.send_message(chat_id=Telegram.ULOG_GROUP, | |
text=f"**#EʀʀᴏʀTʀᴀᴄᴋᴇʙᴀᴄᴋ:** `{e}`", | |
disable_web_page_preview=True) | |
print(f"Cᴀɴ'ᴛ Eᴅɪᴛ Bʀᴏᴀᴅᴄᴀsᴛ Mᴇssᴀɢᴇ!\nEʀʀᴏʀ: **Gɪᴠᴇ ᴍᴇ ᴇᴅɪᴛ ᴘᴇʀᴍɪssɪᴏɴ ɪɴ ᴜᴘᴅᴀᴛᴇs ᴀɴᴅ ʙɪɴ Cʜᴀɴɴᴇʟ!{traceback.format_exc()}**") | |
await session.stop() | |
return response | |