Spaces:
Running
Running
from __future__ import annotations | |
import logging | |
from datetime import datetime | |
from pyrogram import Client | |
from typing import Any, Optional | |
from pyrogram.enums import ParseMode, ChatType | |
from pyrogram.types import Message | |
from pyrogram.file_id import FileId | |
#----------------------------------------------------- | |
from FileStream.bot import FileStream | |
from FileStream.bot import MULTI_CLIENTS | |
from FileStream.Database import Database | |
from FileStream.config import Telegram, Server | |
from FileStream.Tools import Time_ISTKolNow | |
from FileStream.Tools.cleanup import clean_text,clean_string_special | |
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) | |
async def send_file(client: Client, db_id, file_info , message, send_to): | |
file_caption = getattr(message, 'caption', None) or get_name(message) | |
log_msg = await client.send_cached_media(chat_id=send_to, file_id= file_info['file']['file_id'] , caption=f"**{file_caption}**") | |
if message.chat.type == ChatType.PRIVATE: | |
#await log_msg.reply_text( | |
# text= | |
# f"**RᴇQᴜᴇꜱᴛᴇᴅ ʙʏ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**Uꜱᴇʀ ɪᴅ :** `{message.from_user.id}`\n**Fɪʟᴇ ɪᴅ :** `{db_id}`", | |
# disable_web_page_preview=True, | |
# parse_mode=ParseMode.MARKDOWN, | |
# quote=True) | |
await log_msg.edit_text( | |
text= | |
f"**{file_info['type']} :{file_info['title']} \nReleaseDate {file_info['release_date']} \n**Description{file_info['description']} \n**Genre {file_info['genre']}\n\nRᴇQᴜᴇꜱᴛᴇᴅ ʙʏ :** {message.from_user.first_name} \n**Cʜᴀɴɴᴇʟ ɪᴅ :** `{message.chat.id}`\n**Fɪʟᴇ ɪᴅ :** `{db_id}`", | |
disable_web_page_preview=True, | |
parse_mode=ParseMode.MARKDOWN, | |
) | |
else: | |
await log_msg.edit_text( | |
text= | |
f"**{file_info['type']} :{file_info['title']} \nReleaseDate {file_info['release_date']} \n Description{file_info['description']} \n Genre {file_info['genre']}\n\nRᴇQᴜᴇꜱᴛᴇᴅ ʙʏ :** {message.chat.title} \n**Cʜᴀɴɴᴇʟ ɪᴅ :** `{message.chat.id}`\n**Fɪʟᴇ ɪᴅ :** `{db_id}`", | |
disable_web_page_preview=True, | |
parse_mode=ParseMode.MARKDOWN, | |
quote=True) | |
return {"message": log_msg, "sent_to": send_to} | |
async def update_file_id(message, MULTI_CLIENTS): | |
file_ids = {} | |
for client_id, client in MULTI_CLIENTS.items(): | |
#log_msg = await client.get_messages(message['location'],message['message_id']) | |
media = get_media_from_message(await client.get_messages(message['location'], message['message_id'])) | |
file_ids[str(client.id)] = getattr(media, "file_id", "") | |
return file_ids | |
#------------------------------------Update Private File IDs------------------------------------# | |
async def get_private_file_ids(client: Client | bool, db_id: str,message) -> Optional[FileId]: | |
file_info = await db.get_private_file(db_id) | |
file_id_info = file_info.setdefault("file_ids", {}) | |
if not str(client.id) in file_id_info: | |
logging.debug("Storing file_id in DB") | |
#msg = await client.get_messages(Telegram.PFLOG_CHANNEL,file_info['message_id']) | |
media = get_media_from_message(await client.get_messages(Telegram.PFLOG_CHANNEL, file_info['message_id'])) | |
file_id_info[str(client.id)] = getattr(media, "file_id", "") | |
await db.update_private_file_ids(db_id, file_id_info) | |
logging.debug("Stored file_id in DB") | |
file_info = await db.get_private_file(db_id) | |
return file_info[str(client.id)] | |
if not client: | |
return file_info['file']['file_id'] | |
#------------------------Update Public File ID s --------------------------- # | |
async def get_file_ids(client: Client | bool, db_id: str, message) -> Optional[FileId]: | |
logging.debug("Starting of get_file_ids") | |
file_info = await db.get_file(db_id) | |
if (not "file_ids" in file_info) or not client: | |
if file_info['user_type'] == "TELEGRAM": | |
if file_info['location'] in Telegram.DATA_SOURCES: | |
print("Already Present in Data Sources ", Telegram.DATA_SOURCES) | |
else: | |
source = Telegram.DFLOG_CHANNEL if file_info['privacy_type']=="PUBLIC" else Telegram.FLOG_CHANNEL | |
log_msg = await send_file(FileStream, db_id, file_info, message, source) | |
#updated_info = update_file_info(log_msg) | |
await db.update_file_info(db_id, update_file_info(log_msg), file_info['privacy_type']) | |
await db.update_file_ids(db_id, await update_file_id(await db.GetFileByDBName(db_id, file_info['privacy_type']), MULTI_CLIENTS), file_info['privacy_type']) | |
logging.debug("Stored file_id of all clients in DB") | |
if not client: | |
return | |
file_info = await db.get_file(db_id) | |
if file_info['user_type'] == "WEB": | |
await db.update_file_ids(db_id, await update_file_id(await db.get_file(db_id), MULTI_CLIENTS)) | |
logging.debug("Stored file_id of all clients in DB") | |
if not client: | |
return | |
file_info = await db.get_file(db_id) | |
file_id_info = file_info.setdefault("file_ids", {}) | |
if not str(client.id) in file_id_info: | |
logging.debug("Storing file_id in DB") | |
#log_msg = await send_file(FileStream, db_id, file_info['file_id'], message,Telegram.FLOG_CHANNEL) | |
msg = await client.get_messages(file_info['location'],file_info['message_id']) | |
media = get_media_from_message(msg) | |
file_id_info[str(client.id)] = getattr(media, "file_id", "") | |
await db.update_file_ids(db_id, file_id_info) | |
logging.debug("Stored file_id in DB") | |
logging.debug("Middle of get_file_ids") | |
file_id = FileId.decode(file_id_info[str(client.id)]) | |
setattr(file_id, "file_size", file_info['file']['file_size']) | |
setattr(file_id, "mime_type", file_info['file']['mime_type']) | |
setattr(file_id, "file_name", file_info['file']['file_name']) | |
setattr(file_id, "unique_id", file_info['file']['file_unique_id']) | |
logging.debug("Ending of get_file_ids") | |
return file_id | |
#------------------------------ | |
def get_media_from_message(message: "Message") -> Any: | |
media_types = ( | |
"audio", | |
"document", | |
"photo", | |
"sticker", | |
"animation", | |
"video", | |
"voice", | |
"video_note", | |
) | |
for attr in media_types: | |
media = getattr(message, attr, None) | |
if media: | |
return media | |
def get_media_file_size(m): | |
media = get_media_from_message(m) | |
return getattr(media, "file_size", "None") | |
def get_name(media_msg: Message | FileId) -> str: | |
if isinstance(media_msg, Message): | |
media = get_media_from_message(media_msg) | |
file_name = getattr(media, "file_name", "") | |
elif isinstance(media_msg, FileId): | |
file_name = getattr(media_msg, "file_name", "") | |
if not file_name: | |
if isinstance(media_msg, Message) and media_msg.media: | |
media_type = media_msg.media.value | |
elif media_msg.file_type: | |
media_type = media_msg.file_type.name.lower() | |
else: | |
media_type = "file" | |
formats = { | |
"photo": "jpg", | |
"audio": "mp3", | |
"voice": "ogg", | |
"video": "mp4", | |
"animation": "mp4", | |
"video_note": "mp4", | |
"sticker": "webp" | |
} | |
ext = formats.get(media_type) | |
ext = "." + ext if ext else "" | |
date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
#file_name = f"{media_type}-{date}{ext}" | |
file_name ="None/Unknown" | |
return file_name | |
def get_file_info(message, instruction): | |
media = get_media_from_message(message) | |
""" | |
//Used a single Liner Insted of this Block | |
if message.chat.type == ChatType.PRIVATE: | |
user_idx = message.from_user.id | |
else: | |
user_idx = message.chat.id | |
""" | |
# Check if message contains a video and extract height/width | |
if hasattr(message, "video") and message.video: | |
quality = f"{message.video.height}x{message.video.width}" | |
duration= float(message.video.duration) | |
else: | |
quality ="Unknown" | |
duration="unknown" | |
#"caption": clean_text(getattr(message, "caption", f"{get_name(message)}")), | |
return { | |
"user_id": instruction.get("user_id", ""), | |
"user_type": instruction.get("user_type",""), | |
"message_id": message.id, | |
"IMDB_id": instruction.get("IMDB_id",""), | |
"poster": instruction.get("poster",""), | |
"title": instruction.get("title",""), | |
"release_date":instruction.get("release_date",""), | |
"type": instruction.get("type",""), | |
"keywords": instruction.get("keywords",""), | |
"description":instruction.get("description",""), | |
"genre":instruction.get("genre",""), | |
"quality":quality, | |
"duration": duration, | |
"location": message.from_user.id if (message.chat.type == ChatType.PRIVATE) else message.chat.id, | |
"file": { | |
"file_id": getattr(media, "file_id", ""), | |
"caption": clean_string_special(clean_text(getattr(message, "caption", f"{get_name(message)}" ) or "None/Unknown")), | |
"file_unique_id": getattr(media, "file_unique_id", ""), | |
"file_name": str(get_name(message)), | |
"file_size": getattr(media, "file_size", 0), | |
"mime_type": getattr(media, "mime_type", "None/unknown"), | |
"tagged_users": { | |
str(message.from_user.id) if (message.chat.type == ChatType.PRIVATE) else str(Telegram.OWNER_ID): instruction['privacy_type'] | |
} | |
}, | |
"time": Time_ISTKolNow(), | |
"privacy_type": instruction['privacy_type'] | |
} | |
def update_file_info(updates): | |
print(updates) | |
media = get_media_from_message(updates['message']) | |
return { | |
"message_id": updates['message'].id, | |
"location": updates['sent_to'], | |
"file": { | |
"file_id":getattr(media, "file_id", ""), | |
"caption":clean_text(getattr(updates['message'], "caption",f"{get_name(updates['message'])}")), | |
"file_unique_id":getattr(media, "file_unique_id", ""), | |
"file_name":get_name(updates['message']), | |
"file_size":getattr(media, "file_size", 0), | |
"mime_type":getattr(media, "mime_type", "None/unknown"), | |
"tagged_users": {} | |
}, | |
} | |