import os import time import math import logging import asyncio import traceback from aiohttp import web from pyrogram import raw from aiohttp.http_exceptions import BadStatusLine #---------------------Local Upload---------------------# from FileStream.config import Telegram from FileStream.bot import req_client, FileStream from FileStream import utils, StartTime, __version__ from FileStream.Tools import mime_identifier, Time_ISTKolNow from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS from FileStream.Exceptions import FileNotFound, InvalidHash from FileStream.utils.FileProcessors.custom_ul import TeleUploader async def media_streamer(request: web.Request, db_id: str, speed: str): # Get the Range header from the request, default to 0 if not present range_header = request.headers.get("Range", 0) Worker = await req_client(request.remote) client= Worker.get("client") # Log client info if multi-client mode #if Telegram.MULTI_CLIENT: #print("ACTIVE Client",ACTIVE_CLIENTS) #logging.info(f"Client {Worker['index']} is now serving {request.headers.get('X-FORWARDED-FOR', request.remote)}") # Use an existing ByteStreamer or create a new one #tg_connect = ACTIVE_CLIENTS.get(client['client'], None) if client in ACTIVE_CLIENTS: logging.debug(f"Cached ByteStreamer object for client {Worker['index']} # Serving Destination :{request.headers.get('X-FORWARDED-FOR', request.remote)}") tg_connect = ACTIVE_CLIENTS[client] else: tg_connect = utils.ByteStreamer(client) ACTIVE_CLIENTS[client] = tg_connect logging.debug(f"New ByteStreamer object for client {Worker['index']} # Serving Destination :{request.headers.get('X-FORWARDED-FOR', request.remote)}") logging.info(f"Client :{Worker['index']} # Serving Destination : {request.headers.get('X-FORWARDED-FOR', request.remote)}") tg_connect.update_last_activity() try: # Fetch file properties once and use it throughout logging.debug("Fetching file properties") file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS) file_size = file_id.file_size # Parse range header efficiently from_bytes, until_bytes = parse_range(range_header, file_size) # If range is invalid, return a 416 error if (until_bytes > file_size) or (from_bytes < 0) or (until_bytes < from_bytes): return web.Response( status=416, body="416: Range not satisfiable", headers={"Content-Range": f"bytes */{file_size}"}, ) # Set chunk size based on speed chunk_size = 1024 * 1024 if speed == "FAST" else 512 * 1024 # Ensure we don't go past the file size until_bytes = min(until_bytes, file_size - 1) # Compute offset and range parts offset, first_part_cut, last_part_cut, part_count = compute_offsets(from_bytes, until_bytes, chunk_size) # Request the file chunks body = tg_connect.yield_file( file_id, Worker['index'], offset, first_part_cut, last_part_cut, part_count, chunk_size ) # Determine MIME type and filename mime_type = file_id.mime_type or mimetypes.guess_type(file_id.file_name)[0] or "application/octet-stream" file_name = utils.get_name(file_id) disposition = "attachment" # Return the response with proper headers and status req_length = until_bytes - from_bytes + 1 return web.Response( status=206 if range_header else 200, body=body, headers={ "Content-Type": mime_type, "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}", "Content-Length": str(req_length), "Content-Disposition": f'{disposition}; filename="{file_name}"', "Accept-Ranges": "bytes", }, ) except Exception as e: logging.error(f"Error in media_streamer: {traceback.format_exc()}") raise web.HTTPInternalServerError() # Re-raise the exception as a server error def parse_range(range_header: str, file_size: int): """Helper function to parse the range header.""" try: if range_header: from_bytes, until_bytes = range_header.replace("bytes=", "").split("-") from_bytes = int(from_bytes) until_bytes = int(until_bytes) if until_bytes else file_size - 1 else: from_bytes = 0 until_bytes = file_size - 1 #request :web.Request , #from_bytes = request.http_range.start or 0 #until_bytes = (request.http_range.stop or file_size) - 1 return from_bytes, until_bytes except ValueError: return None, None def compute_offsets(from_bytes: int, until_bytes: int, chunk_size: int): """Compute the offsets, cuts, and part counts for file chunking.""" offset = from_bytes - (from_bytes % chunk_size) first_part_cut = from_bytes - offset last_part_cut = until_bytes % chunk_size + 1 part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size) return offset, first_part_cut, last_part_cut, part_count