File size: 9,920 Bytes
faa9726 e566133 faa9726 8fde8ae f2772b4 faa9726 f2772b4 faa9726 f2772b4 faa9726 1268592 faa9726 3f29913 e566133 a88b8c8 e566133 a88b8c8 dc6febb a88b8c8 26741bb a88b8c8 e566133 a88b8c8 e566133 a88b8c8 e566133 a88b8c8 26741bb a88b8c8 e566133 a88b8c8 e566133 a88b8c8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
import os
import time
import math
import asyncio
import logging
import traceback
from aiohttp import web
from typing import Dict, Union,Optional
from pyrogram.types import Message
from pyrogram import Client, utils, raw
from pyrogram.session import Session, Auth
from pyrogram.errors import AuthBytesInvalid
from aiohttp.http_exceptions import BadStatusLine
from pyrogram.file_id import FileId, FileType, ThumbnailSource
#---------------------Local Upload---------------------#
from FileStream.config import Telegram
from .file_properties import get_file_ids
from FileStream.bot import req_client, FileStream
from FileStream import utils, StartTime, __version__
from FileStream.Tools import mime_identifier, Time_ISTKolNow
from FileStream.utils.FileProcessors.custom_ul import TeleUploader
from FileStream.Exceptions import FileNotFound, InvalidHash
from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
class ByteStreamer:
def __init__(self, client: Client):
self.clean_timer = 30 * 60 # Cache cleanup timer set to 30 minutes
self.client: Client = client
self.cached_file_ids: Dict[str, FileId] = {} # Cache to store file properties by db_id
self.last_activity: float = asyncio.get_event_loop().time() # Track last activity time for the client
asyncio.create_task(self.clean_cache()) # Start the cache cleanup task
def update_last_activity(self):
"""Update the last activity time to the current time."""
self.last_activity = asyncio.get_event_loop().time()
def get_last_activity(self) -> float:
"""Get the last activity time of this client."""
return self.last_activity
async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
"""
Returns the properties of a media of a specific message in a FileId class.
If the properties are cached, it'll return the cached results.
Otherwise, it'll generate the properties from the Message ID and cache them.
"""
if db_id not in self.cached_file_ids:
logging.debug("File properties not cached. Generating properties.")
await self.generate_file_properties(db_id, MULTI_CLIENTS) # Generate and cache the file properties
logging.debug(f"Cached file properties for file with ID {db_id}")
return self.cached_file_ids[db_id]
async def generate_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
"""
Generates the properties of a media file on a specific message.
Returns the properties in a FileId class.
"""
logging.debug("Generating file properties.")
file_id = await get_file_ids(self.client, db_id, Message) # Call the method to get the file properties
logging.debug(f"Generated file ID and Unique ID for file with ID {db_id}")
self.cached_file_ids[db_id] = file_id # Cache the file properties
logging.debug(f"Cached media file with ID {db_id}")
return file_id
async def generate_media_session(self, client: Client, file_id: FileId) -> Session:
"""
Generates the media session for the DC that contains the media file.
This is required for getting the bytes from Telegram servers.
"""
media_session = client.media_sessions.get(file_id.dc_id, None)
if media_session is None:
if file_id.dc_id != await client.storage.dc_id():
# Create a new media session if one doesn't exist for this DC ID
media_session = Session(
client,
file_id.dc_id,
await Auth(client, file_id.dc_id, await client.storage.test_mode()).create(),
await client.storage.test_mode(),
is_media=True,
)
await media_session.start()
# Attempt to import authorization from Telegram's servers
for _ in range(6):
exported_auth = await client.invoke(
raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id))
try:
# Import the authorization bytes for the DC
await media_session.invoke(
raw.functions.auth.ImportAuthorization(
id=exported_auth.id, bytes=exported_auth.bytes))
break
except AuthBytesInvalid:
logging.debug(f"Invalid authorization bytes for DC {file_id.dc_id}")
continue
else:
await media_session.stop()
raise AuthBytesInvalid
else:
# Reuse the stored auth key if we're already connected to the correct DC
media_session = Session(
client,
file_id.dc_id,
await client.storage.auth_key(),
await client.storage.test_mode(),
is_media=True,
)
await media_session.start()
logging.debug(f"Created media session for DC {file_id.dc_id}")
client.media_sessions[file_id.dc_id] = media_session # Cache the media session
else:
logging.debug(f"Using cached media session for DC {file_id.dc_id}")
return media_session
@staticmethod
async def get_location(file_id: FileId) -> Union[
raw.types.InputPhotoFileLocation,
raw.types.InputDocumentFileLocation,
raw.types.InputPeerPhotoFileLocation,
]:
"""
Returns the file location for the media file based on its type (Photo or Document).
"""
file_type = file_id.file_type
if file_type == FileType.CHAT_PHOTO:
# Handle the case for chat photos
if file_id.chat_id > 0:
peer = raw.types.InputPeerUser(user_id=file_id.chat_id, access_hash=file_id.chat_access_hash)
else:
peer = raw.types.InputPeerChannel(
channel_id=utils.get_channel_id(file_id.chat_id),
access_hash=file_id.chat_access_hash,
)
location = raw.types.InputPeerPhotoFileLocation(
peer=peer,
volume_id=file_id.volume_id,
local_id=file_id.local_id,
big=file_id.thumbnail_source == ThumbnailSource.CHAT_PHOTO_BIG,
)
elif file_type == FileType.PHOTO:
# Handle regular photos
location = raw.types.InputPhotoFileLocation(
id=file_id.media_id,
access_hash=file_id.access_hash,
file_reference=file_id.file_reference,
thumb_size=file_id.thumbnail_size,
)
else:
# Handle document files
location = raw.types.InputDocumentFileLocation(
id=file_id.media_id,
access_hash=file_id.access_hash,
file_reference=file_id.file_reference,
thumb_size=file_id.thumbnail_size,
)
return location
async def yield_file(
self,
file_id: FileId,
index: int,
offset: int,
first_part_cut: int,
last_part_cut: int,
part_count: int,
chunk_size: int,
) -> Union[str, None]:
"""
Yields the file in chunks based on the specified range and chunk size.
This method streams the file from Telegram's server, breaking it into smaller parts.
"""
client = self.client
WORK_LOADS[index] += 1 # Increase the workload for this client
logging.debug(f"Starting to yield file with client {index}.")
media_session = await self.generate_media_session(client, file_id)
current_part = 1
location = await self.get_location(file_id)
try:
# Fetch the file chunks
r = await media_session.invoke(
raw.functions.upload.GetFile(location=location, offset=offset, limit=chunk_size), )
if isinstance(r, raw.types.upload.File):
# Stream the file in chunks
while True:
chunk = r.bytes
if not chunk:
break
elif part_count == 1:
yield chunk[first_part_cut:last_part_cut]
elif current_part == 1:
yield chunk[first_part_cut:]
elif current_part == part_count:
yield chunk[:last_part_cut]
else:
yield chunk
current_part += 1
offset += chunk_size
if current_part > part_count:
break
r = await media_session.invoke(
raw.functions.upload.GetFile(location=location, offset=offset, limit=chunk_size), )
except (TimeoutError, AttributeError):
pass
except Exception as e:
logging.info(f"Error at Bytestreamer Generating Chunk : {e}")
finally:
logging.debug(f"Finished yielding file with {current_part} parts.")
WORK_LOADS[index] -= 1 # Decrease the workload for this client
async def clean_cache(self) -> None:
"""
Function to clean the cache to reduce memory usage.
This method will be called periodically to clear the cached file properties.
"""
await asyncio.sleep(self.clean_timer) # Wait for the cleanup interval
logging.info("*** Cleaning cached file IDs...")
self.cached_file_ids.clear() # Clear the cache
logging.debug("Cache cleaned.")
|