|
from __future__ import annotations |
|
import logging |
|
from datetime import datetime |
|
from pyrogram import Client |
|
from typing import Any, Optional |
|
|
|
from pyrogram.enums import ParseMode, ChatType |
|
from pyrogram.types import Message |
|
from pyrogram.file_id import FileId |
|
|
|
from FileStream.bot import FileStream |
|
from FileStream.bot import MULTI_CLIENTS |
|
from FileStream.Database import Database |
|
from FileStream.config import Telegram, Server |
|
from FileStream.Tools import Time_ISTKolNow |
|
from FileStream.Tools.cleanup import clean_text |
|
|
|
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) |
|
|
|
|
|
async def send_file(client: Client, db_id, file_id: str, message, send_to): |
|
file_caption = getattr(message, 'caption', None) or get_name(message) |
|
log_msg = await client.send_cached_media(chat_id=send_to, file_id=file_id, caption=f"**{file_caption}**") |
|
|
|
if message.chat.type == ChatType.PRIVATE: |
|
await log_msg.reply_text( |
|
text= |
|
f"**RᴇQᴜᴇꜱᴛᴇᴅ ʙʏ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**Uꜱᴇʀ ɪᴅ :** `{message.from_user.id}`\n**Fɪʟᴇ ɪᴅ :** `{db_id}`", |
|
disable_web_page_preview=True, |
|
parse_mode=ParseMode.MARKDOWN, |
|
quote=True) |
|
else: |
|
await log_msg.reply_text( |
|
text= |
|
f"**RᴇQᴜᴇꜱᴛᴇᴅ ʙʏ :** {message.chat.title} \n**Cʜᴀɴɴᴇʟ ɪᴅ :** `{message.chat.id}`\n**Fɪʟᴇ ɪᴅ :** `{db_id}`", |
|
disable_web_page_preview=True, |
|
parse_mode=ParseMode.MARKDOWN, |
|
quote=True) |
|
|
|
return {"message": log_msg, "sent_to": send_to} |
|
|
|
|
|
async def update_file_id(message, MULTI_CLIENTS): |
|
file_ids = {} |
|
for client_id, client in MULTI_CLIENTS.items(): |
|
|
|
media = get_media_from_message(await client.get_messages(message['location'],message['message_id'])) |
|
file_ids[str(client.id)] = getattr(media, "file_id", "") |
|
|
|
return file_ids |
|
|
|
|
|
|
|
async def get_private_file_ids(client: Client | bool, db_id: str,message) -> Optional[FileId]: |
|
file_info = await db.get_private_file(db_id) |
|
file_id_info = file_info.setdefault("file_ids", {}) |
|
if not str(client.id) in file_id_info: |
|
logging.debug("Storing file_id in DB") |
|
|
|
media = get_media_from_message(await client.get_messages( |
|
Telegram.PFLOG_CHANNEL, file_info['message_id'])) |
|
file_id_info[str(client.id)] = getattr(media, "file_id", "") |
|
await db.update_private_file_ids(db_id, file_id_info) |
|
logging.debug("Stored file_id in DB") |
|
file_info = await db.get_private_file(db_id) |
|
return file_info[str(client.id)] |
|
if not client: |
|
return file_info['file']['file_id'] |
|
|
|
|
|
|
|
async def get_file_ids(client: Client | bool, db_id: str, message) -> Optional[FileId]: |
|
|
|
logging.debug("Starting of get_file_ids") |
|
file_info = await db.get_file(db_id) |
|
|
|
if (not "file_ids" in file_info) or not client: |
|
if file_info['user_type'] == "TELEGRAM": |
|
if file_info['location'] in Telegram.DATA_SOURCES: |
|
print("Already Present in Data Sources ", Telegram.DATA_SOURCES) |
|
else: |
|
log_msg = await send_file(FileStream, db_id,file_info['file']['file_id'], message,Telegram.FLOG_CHANNEL) |
|
|
|
await db.update_file_info(db_id, update_file_info(log_msg)) |
|
await db.update_file_ids(db_id, await update_file_id(await db.get_file(db_id),MULTI_CLIENTS)) |
|
|
|
logging.debug("Stored file_id of all clients in DB") |
|
if not client: |
|
return |
|
file_info = await db.get_file(db_id) |
|
|
|
if file_info['user_type'] == "WEB": |
|
await db.update_file_ids(db_id, await update_file_id(await db.get_file(db_id), MULTI_CLIENTS)) |
|
logging.debug("Stored file_id of all clients in DB") |
|
if not client: |
|
return |
|
file_info = await db.get_file(db_id) |
|
|
|
file_id_info = file_info.setdefault("file_ids", {}) |
|
if not str(client.id) in file_id_info: |
|
logging.debug("Storing file_id in DB") |
|
|
|
msg = await client.get_messages(file_info['location'],file_info['message_id']) |
|
media = get_media_from_message(msg) |
|
file_id_info[str(client.id)] = getattr(media, "file_id", "") |
|
await db.update_file_ids(db_id, file_id_info) |
|
logging.debug("Stored file_id in DB") |
|
|
|
logging.debug("Middle of get_file_ids") |
|
file_id = FileId.decode(file_id_info[str(client.id)]) |
|
setattr(file_id, "file_size", file_info['file']['file_size']) |
|
setattr(file_id, "mime_type", file_info['file']['mime_type']) |
|
setattr(file_id, "file_name", file_info['file']['file_name']) |
|
setattr(file_id, "unique_id", file_info['file']['file_unique_id']) |
|
logging.debug("Ending of get_file_ids") |
|
return file_id |
|
|
|
|
|
|
|
def get_media_from_message(message: "Message") -> Any: |
|
media_types = ( |
|
"audio", |
|
"document", |
|
"photo", |
|
"sticker", |
|
"animation", |
|
"video", |
|
"voice", |
|
"video_note", |
|
) |
|
for attr in media_types: |
|
media = getattr(message, attr, None) |
|
if media: |
|
return media |
|
|
|
|
|
def get_media_file_size(m): |
|
media = get_media_from_message(m) |
|
return getattr(media, "file_size", "None") |
|
|
|
|
|
def get_name(media_msg: Message | FileId) -> str: |
|
if isinstance(media_msg, Message): |
|
media = get_media_from_message(media_msg) |
|
file_name = getattr(media, "file_name", "") |
|
|
|
elif isinstance(media_msg, FileId): |
|
file_name = getattr(media_msg, "file_name", "") |
|
|
|
if not file_name: |
|
if isinstance(media_msg, Message) and media_msg.media: |
|
media_type = media_msg.media.value |
|
elif media_msg.file_type: |
|
media_type = media_msg.file_type.name.lower() |
|
else: |
|
media_type = "file" |
|
|
|
formats = { |
|
"photo": "jpg", |
|
"audio": "mp3", |
|
"voice": "ogg", |
|
"video": "mp4", |
|
"animation": "mp4", |
|
"video_note": "mp4", |
|
"sticker": "webp" |
|
} |
|
|
|
ext = formats.get(media_type) |
|
ext = "." + ext if ext else "" |
|
|
|
date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") |
|
file_name = f"{media_type}-{date}{ext}" |
|
|
|
return file_name |
|
|
|
|
|
def get_file_info(message, instruction): |
|
media = get_media_from_message(message) |
|
""" |
|
//Used a single Liner Insted of this Block |
|
if message.chat.type == ChatType.PRIVATE: |
|
user_idx = message.from_user.id |
|
else: |
|
user_idx = message.chat.id |
|
""" |
|
|
|
return { |
|
"user_id": instruction['user_id'], |
|
"user_type": instruction['user_type'], |
|
"message_id": message.id, |
|
"location": message.from_user.id if (message.chat.type == ChatType.PRIVATE) else message.chat.id, |
|
"file": { |
|
"file_id": getattr(media, "file_id", ""), |
|
"caption": clean_text(getattr(message, "caption", f"{get_name(message)}")), |
|
"file_unique_id": getattr(media, "file_unique_id", ""), |
|
"file_name": get_name(message), |
|
"file_size": getattr(media, "file_size", 0), |
|
"mime_type": getattr(media, "mime_type", "None/unknown"), |
|
"tagged_users": { |
|
str(message.from_user.id) if (message.chat.type == ChatType.PRIVATE) else str(Telegram.OWNER_ID): instruction['privacy_type'] |
|
} |
|
}, |
|
"time":Time_ISTKolNow(), |
|
"privacy_type":instruction['privacy_type'] |
|
} |
|
|
|
|
|
def update_file_info(updates): |
|
media = get_media_from_message(updates['message']) |
|
return { |
|
"message_id": updates['message'].id, |
|
"location": updates['sent_to'], |
|
"file": { |
|
"file_id":getattr(media, "file_id", ""), |
|
"caption":clean_text(getattr(updates['message'], "caption",f"{get_name(updates['message'])}")), |
|
"file_unique_id":getattr(media, "file_unique_id", ""), |
|
"file_name":get_name(updates['message']), |
|
"file_size":getattr(media, "file_size", 0), |
|
"mime_type":getattr(media, "mime_type", "None/unknown"), |
|
"tagged_users": {} |
|
}, |
|
} |
|
|