fsb / FileStream /utils /FileProcessors /file_properties.py
privateone's picture
Update FileStream/utils/FileProcessors/file_properties.py
692c014 verified
from __future__ import annotations
import logging
from datetime import datetime
from pyrogram import Client
from typing import Any, Optional
from pyrogram.enums import ParseMode, ChatType
from pyrogram.types import Message
from pyrogram.file_id import FileId
#-----------------------------------------------------
from FileStream.bot import FileStream
from FileStream.bot import MULTI_CLIENTS
from FileStream.Database import Database
from FileStream.config import Telegram, Server
from FileStream.Tools import Time_ISTKolNow
from FileStream.Tools.cleanup import clean_text,clean_string_special
db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
async def send_file(client: Client, db_id, file_info , message, send_to):
file_caption = getattr(message, 'caption', None) or get_name(message)
log_msg = await client.send_cached_media(chat_id=send_to, file_id= file_info['file']['file_id'] , caption=f"**{file_caption}**")
if message.chat.type == ChatType.PRIVATE:
#await log_msg.reply_text(
# text=
# f"**RᴇQᴜᴇꜱᴛᴇᴅ ʙʏ :** [{message.from_user.first_name}](tg://user?id={message.from_user.id})\n**Uꜱᴇʀ ɪᴅ :** `{message.from_user.id}`\n**Fɪʟᴇ ɪᴅ :** `{db_id}`",
# disable_web_page_preview=True,
# parse_mode=ParseMode.MARKDOWN,
# quote=True)
await log_msg.edit_text(
text=
f"**{file_info['type']} :{file_info['title']} \nReleaseDate {file_info['release_date']} \n**Description{file_info['description']} \n**Genre {file_info['genre']}\n\nRᴇQᴜᴇꜱᴛᴇᴅ ʙʏ :** {message.from_user.first_name} \n**Cʜᴀɴɴᴇʟ ɪᴅ :** `{message.chat.id}`\n**Fɪʟᴇ ɪᴅ :** `{db_id}`",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN,
)
else:
await log_msg.edit_text(
text=
f"**{file_info['type']} :{file_info['title']} \nReleaseDate {file_info['release_date']} \n Description{file_info['description']} \n Genre {file_info['genre']}\n\nRᴇQᴜᴇꜱᴛᴇᴅ ʙʏ :** {message.chat.title} \n**Cʜᴀɴɴᴇʟ ɪᴅ :** `{message.chat.id}`\n**Fɪʟᴇ ɪᴅ :** `{db_id}`",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN,
quote=True)
return {"message": log_msg, "sent_to": send_to}
async def update_file_id(message, MULTI_CLIENTS):
file_ids = {}
for client_id, client in MULTI_CLIENTS.items():
#log_msg = await client.get_messages(message['location'],message['message_id'])
media = get_media_from_message(await client.get_messages(message['location'], message['message_id']))
file_ids[str(client.id)] = getattr(media, "file_id", "")
return file_ids
#------------------------------------Update Private File IDs------------------------------------#
async def get_private_file_ids(client: Client | bool, db_id: str,message) -> Optional[FileId]:
file_info = await db.get_private_file(db_id)
file_id_info = file_info.setdefault("file_ids", {})
if not str(client.id) in file_id_info:
logging.debug("Storing file_id in DB")
#msg = await client.get_messages(Telegram.PFLOG_CHANNEL,file_info['message_id'])
media = get_media_from_message(await client.get_messages(Telegram.PFLOG_CHANNEL, file_info['message_id']))
file_id_info[str(client.id)] = getattr(media, "file_id", "")
await db.update_private_file_ids(db_id, file_id_info)
logging.debug("Stored file_id in DB")
file_info = await db.get_private_file(db_id)
return file_info[str(client.id)]
if not client:
return file_info['file']['file_id']
#------------------------Update Public File ID s --------------------------- #
async def get_file_ids(client: Client | bool, db_id: str, message) -> Optional[FileId]:
logging.debug("Starting of get_file_ids")
file_info = await db.get_file(db_id)
if (not "file_ids" in file_info) or not client:
if file_info['user_type'] == "TELEGRAM":
if file_info['location'] in Telegram.DATA_SOURCES:
print("Already Present in Data Sources ", Telegram.DATA_SOURCES)
else:
source = Telegram.DFLOG_CHANNEL if file_info['privacy_type']=="PUBLIC" else Telegram.FLOG_CHANNEL
log_msg = await send_file(FileStream, db_id, file_info, message, source)
#updated_info = update_file_info(log_msg)
await db.update_file_info(db_id, update_file_info(log_msg), file_info['privacy_type'])
await db.update_file_ids(db_id, await update_file_id(await db.GetFileByDBName(db_id, file_info['privacy_type']), MULTI_CLIENTS), file_info['privacy_type'])
logging.debug("Stored file_id of all clients in DB")
if not client:
return
file_info = await db.get_file(db_id)
if file_info['user_type'] == "WEB":
await db.update_file_ids(db_id, await update_file_id(await db.get_file(db_id), MULTI_CLIENTS))
logging.debug("Stored file_id of all clients in DB")
if not client:
return
file_info = await db.get_file(db_id)
file_id_info = file_info.setdefault("file_ids", {})
if not str(client.id) in file_id_info:
logging.debug("Storing file_id in DB")
#log_msg = await send_file(FileStream, db_id, file_info['file_id'], message,Telegram.FLOG_CHANNEL)
msg = await client.get_messages(file_info['location'],file_info['message_id'])
media = get_media_from_message(msg)
file_id_info[str(client.id)] = getattr(media, "file_id", "")
await db.update_file_ids(db_id, file_id_info)
logging.debug("Stored file_id in DB")
logging.debug("Middle of get_file_ids")
file_id = FileId.decode(file_id_info[str(client.id)])
setattr(file_id, "file_size", file_info['file']['file_size'])
setattr(file_id, "mime_type", file_info['file']['mime_type'])
setattr(file_id, "file_name", file_info['file']['file_name'])
setattr(file_id, "unique_id", file_info['file']['file_unique_id'])
logging.debug("Ending of get_file_ids")
return file_id
#------------------------------
def get_media_from_message(message: "Message") -> Any:
media_types = (
"audio",
"document",
"photo",
"sticker",
"animation",
"video",
"voice",
"video_note",
)
for attr in media_types:
media = getattr(message, attr, None)
if media:
return media
def get_media_file_size(m):
media = get_media_from_message(m)
return getattr(media, "file_size", "None")
def get_name(media_msg: Message | FileId) -> str:
if isinstance(media_msg, Message):
media = get_media_from_message(media_msg)
file_name = getattr(media, "file_name", "")
elif isinstance(media_msg, FileId):
file_name = getattr(media_msg, "file_name", "")
if not file_name:
if isinstance(media_msg, Message) and media_msg.media:
media_type = media_msg.media.value
elif media_msg.file_type:
media_type = media_msg.file_type.name.lower()
else:
media_type = "file"
formats = {
"photo": "jpg",
"audio": "mp3",
"voice": "ogg",
"video": "mp4",
"animation": "mp4",
"video_note": "mp4",
"sticker": "webp"
}
ext = formats.get(media_type)
ext = "." + ext if ext else ""
date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
#file_name = f"{media_type}-{date}{ext}"
file_name ="None/Unknown"
return file_name
def get_file_info(message, instruction):
media = get_media_from_message(message)
"""
//Used a single Liner Insted of this Block
if message.chat.type == ChatType.PRIVATE:
user_idx = message.from_user.id
else:
user_idx = message.chat.id
"""
# Check if message contains a video and extract height/width
if hasattr(message, "video") and message.video:
quality = f"{message.video.height}x{message.video.width}"
duration= float(message.video.duration)
else:
quality ="Unknown"
duration="unknown"
#"caption": clean_text(getattr(message, "caption", f"{get_name(message)}")),
return {
"user_id": instruction.get("user_id", ""),
"user_type": instruction.get("user_type",""),
"message_id": message.id,
"IMDB_id": instruction.get("IMDB_id",""),
"poster": instruction.get("poster",""),
"title": instruction.get("title",""),
"release_date":instruction.get("release_date",""),
"type": instruction.get("type",""),
"keywords": instruction.get("keywords",""),
"description":instruction.get("description",""),
"genre":instruction.get("genre",""),
"quality":quality,
"duration": duration,
"location": message.from_user.id if (message.chat.type == ChatType.PRIVATE) else message.chat.id,
"file": {
"file_id": getattr(media, "file_id", ""),
"caption": clean_string_special(clean_text(getattr(message, "caption", f"{get_name(message)}" ) or "None/Unknown")),
"file_unique_id": getattr(media, "file_unique_id", ""),
"file_name": str(get_name(message)),
"file_size": getattr(media, "file_size", 0),
"mime_type": getattr(media, "mime_type", "None/unknown"),
"tagged_users": {
str(message.from_user.id) if (message.chat.type == ChatType.PRIVATE) else str(Telegram.OWNER_ID): instruction['privacy_type']
}
},
"time": Time_ISTKolNow(),
"privacy_type": instruction['privacy_type']
}
def update_file_info(updates):
#print(updates)
media = get_media_from_message(updates['message'])
return {
"message_id": updates['message'].id,
"location": updates['sent_to'],
"file": {
"file_id":getattr(media, "file_id", ""),
"caption":clean_text(getattr(updates['message'], "caption",f"{get_name(updates['message'])}")),
"file_unique_id":getattr(media, "file_unique_id", ""),
"file_name":get_name(updates['message']),
"file_size":getattr(media, "file_size", 0),
"mime_type":getattr(media, "mime_type", "None/unknown"),
"tagged_users": {}
}
}