|
import urllib3 |
|
import asyncio |
|
import traceback |
|
from hashlib import md5 |
|
from typing import Dict, Union, BinaryIO, Callable |
|
|
|
|
|
from pyrogram.session import Session |
|
from pyrogram.errors import FloodWait |
|
from pyrogram.enums.parse_mode import ParseMode |
|
from pyrogram import filters, types, Client, raw |
|
from pyrogram.file_id import FileId, FileType, PHOTO_TYPES, ThumbnailSource |
|
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery |
|
|
|
from FileStream.bot import FileStream, MULTI_CLIENTS, WORK_LOADS |
|
from FileStream.config import Telegram, Server |
|
from FileStream.Database import Database |
|
|
|
from FileStream.utils.FileProcessors.translation import LANG, BUTTON |
|
from FileStream.utils.FileProcessors.bot_utils import gen_link, priv_func, gen_priv_file_link |
|
from FileStream.utils.FileProcessors.file_properties import get_file_ids, get_file_info |
|
|
|
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) |
|
|
|
Hash = {} |
|
|
|
class TeleUploader: |
|
|
|
def __init__(self, client: Client): |
|
self.clean_timer = 30 * 60 |
|
self.client: Client = client |
|
self.cached_file_ids: Dict[str, FileId] = {} |
|
asyncio.create_task(self.clean_cache()) |
|
|
|
|
|
async def clean_cache(self) -> None: |
|
""" |
|
function to clean the cache to reduce memory usage |
|
""" |
|
while True: |
|
await asyncio.sleep(self.clean_timer) |
|
print("** Caches Cleared :", self.cached_file_ids) |
|
self.cached_file_ids.clear() |
|
print("Cleaned the cache") |
|
logging.debug("Cleaned the cache") |
|
|
|
async def get_me(self): |
|
return await self.client.get_me().username |
|
|
|
async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId: |
|
""" |
|
Returns the properties of a media of a specific message in a FIleId class. |
|
if the properties are cached, then it'll return the cached results. |
|
or it'll generate the properties from the Message ID and cache them. |
|
""" |
|
if not db_id in self.cached_file_ids: |
|
logging.debug("Before Calling generate_file_properties") |
|
await self.generate_file_properties(db_id, MULTI_CLIENTS) |
|
logging.debug(f"Cached file properties for file with ID {db_id}") |
|
return self.cached_file_ids[db_id] |
|
|
|
async def generate_media_session(self, client: Client,file_id: FileId) -> Session: |
|
""" |
|
Generates the media session for the DC that contains the media file. |
|
This is required for getting the bytes from Telegram servers. |
|
""" |
|
|
|
media_session = client.media_sessions.get(file_id.dc_id, None) |
|
|
|
if media_session is None: |
|
if file_id.dc_id != await client.storage.dc_id(): |
|
media_session = Session( |
|
client, |
|
file_id.dc_id, |
|
await Auth(client, file_id.dc_id, await client.storage.test_mode()).create(), |
|
await client.storage.test_mode(), |
|
is_media=True, |
|
) |
|
await media_session.start() |
|
|
|
for _ in range(6): |
|
exported_auth = await client.invoke( |
|
raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id)) |
|
|
|
try: |
|
await media_session.invoke( |
|
raw.functions.auth.ImportAuthorization(id=exported_auth.id, bytes=exported_auth.bytes)) |
|
break |
|
except AuthBytesInvalid: |
|
logging.debug(f"Invalid authorization bytes for DC {file_id.dc_id}") |
|
continue |
|
else: |
|
await media_session.stop() |
|
raise AuthBytesInvalid |
|
else: |
|
media_session = Session( |
|
client, |
|
file_id.dc_id, |
|
await client.storage.auth_key(), |
|
await client.storage.test_mode(), |
|
is_media=True, |
|
) |
|
await media_session.start() |
|
logging.debug(f"Created media session for DC {file_id.dc_id}") |
|
client.media_sessions[file_id.dc_id] = media_session |
|
else: |
|
logging.debug(f"Using cached media session for DC {file_id.dc_id}") |
|
return media_session |
|
|
|
def mime(self, filename): |
|
import mimetypes |
|
mime_type, encoding = mimetypes.guess_type(filename) |
|
|
|
return mime_type |
|
|
|
async def gen_session(self): |
|
client = self.client |
|
|
|
session = Session(client, |
|
await client.storage.dc_id(), |
|
await client.storage.auth_key(), |
|
await client.storage.test_mode(), |
|
is_media=True) |
|
|
|
return session |
|
|
|
async def upload_web_file(self, file_details, chunk): |
|
client = self.client |
|
""" |
|
These Name Are not Used Insted of that directly dictionary Name Used |
|
file_name = file_details["file_name"] |
|
file_size = file_details["file_size"] |
|
file_part = file_details["file_part"] |
|
total_parts = file_details["total_parts"] |
|
upload_id = file_details["upload_id"] |
|
|
|
""" |
|
file_details['file']["file_id"] = client.rnd_id() |
|
|
|
response = dict(status="ok", message="ok") |
|
if file_details["file"]["file_size"] == 0: |
|
raise ValueError("File size equals to 0 B") |
|
|
|
file_size_limit_mib = 4000 if client.me.is_premium else 2000 |
|
|
|
if file_details["file"]["file_size"] > file_size_limit_mib * 1024 * 1024: |
|
raise ValueError(f"Can't upload files bigger than {file_size_limit_mib} MiB") |
|
|
|
|
|
|
|
is_big = file_details["file"]["file_size"] > 1 |
|
session = await self.gen_session() |
|
await session.start() |
|
|
|
file = await db.add_webfile(file_details) |
|
|
|
if not is_big or file_details["file"]["file_part"] == 0: |
|
await client.send_message( |
|
chat_id=Telegram.ULOG_GROUP, |
|
text=f"Hi, I am just Started, Do Not Disturb Please", |
|
disable_web_page_preview=True) |
|
Hash['md5_sum'] = md5() |
|
|
|
try: |
|
if is_big: |
|
rpc = raw.functions.upload.SaveBigFilePart( |
|
file_id=file['file']["file_id"], |
|
file_part=file_details["file"]["file_part"], |
|
file_total_parts=file_details["file"]["total_parts"], |
|
bytes=chunk) |
|
response['status'] = "success" |
|
response['message'] = f"Uploading as Bigfile {file_details['file']['file_part']}/{file_details['file']['total_parts']}" |
|
print("Response", response) |
|
|
|
await session.invoke(rpc) |
|
|
|
except (TimeoutError, AttributeError): |
|
pass |
|
if file_details['file'][ |
|
"file_part"] == file_details['file']["total_parts"] - 1: |
|
print("Final Function") |
|
if is_big: |
|
final = raw.types.InputFileBig( |
|
id=file['file']["file_id"], |
|
parts=file_details['file']["total_parts"], |
|
name=file_details['file']["file_name"], |
|
) |
|
|
|
media = raw.types.InputMediaUploadedDocument( |
|
file=final, |
|
mime_type=file_details['file']["mime_type"], |
|
attributes=[raw.types.DocumentAttributeFilename(file_name=file_details['file']["file_name"])]) |
|
|
|
try: |
|
msgs = await client.invoke( |
|
raw.functions.messages.SendMedia( |
|
peer=await client.resolve_peer(Telegram.FLOG_CHANNEL), |
|
media=media, |
|
message=file_details['file']["file_name"], |
|
random_id=file['file']["file_id"])) |
|
|
|
|
|
message = await FileStream.send_message(Telegram.ULOG_GROUP, "Message sent with **Pyrogram**!") |
|
message_id = getattr(getattr(getattr(msgs, "updates", "")[1], "message", ""), "id", "") |
|
|
|
print("Printing msg-id", message_id) |
|
chat_id = Telegram.FLOG_CHANNEL |
|
print("Printing ", message_id, chat_id) |
|
MessageFile = await FileStream.get_messages(chat_id, message_id) |
|
|
|
instruction = { |
|
"privacy_type": "PRIVATE", |
|
"user_id": file_details["user_id"], |
|
"user_type": "WEB" |
|
} |
|
file_info = get_file_info(MessageFile, instruction) |
|
print("Printing file_info", file_info) |
|
|
|
await db.uploaded_web_file(file_details["dropzone_id"]) |
|
|
|
|
|
|
|
inserted_id = await db.add_file(file_info=file_info,db_type="TEMPORARY") |
|
await get_file_ids(False, inserted_id, MessageFile) |
|
reply_markup,stream_text = await gen_link(_id=inserted_id) |
|
await message.edit_text( |
|
text=stream_text, |
|
parse_mode=ParseMode.HTML, |
|
disable_web_page_preview=True, |
|
reply_markup=reply_markup, |
|
) |
|
|
|
|
|
|
|
except Exception as e: |
|
await client.send_message(chat_id=Telegram.ULOG_GROUP, |
|
text=f"**#EʀʀᴏʀTʀᴀᴄᴋᴇʙᴀᴄᴋ:** `{e}`", |
|
disable_web_page_preview=True) |
|
print(f"Cᴀɴ'ᴛ Eᴅɪᴛ Bʀᴏᴀᴅᴄᴀsᴛ Mᴇssᴀɢᴇ!\nEʀʀᴏʀ: **Gɪᴠᴇ ᴍᴇ ᴇᴅɪᴛ ᴘᴇʀᴍɪssɪᴏɴ ɪɴ ᴜᴘᴅᴀᴛᴇs ᴀɴᴅ ʙɪɴ Cʜᴀɴɴᴇʟ!{traceback.format_exc()}**") |
|
await session.stop() |
|
|
|
return response |
|
|