|
import os |
|
import time |
|
import math |
|
import logging |
|
import asyncio |
|
import traceback |
|
from aiohttp import web |
|
from pyrogram import raw |
|
from aiohttp.http_exceptions import BadStatusLine |
|
|
|
|
|
|
|
from FileStream.config import Telegram |
|
from FileStream.bot import req_client, FileStream |
|
from FileStream import utils, StartTime, __version__ |
|
from FileStream.Tools import mime_identifier, Time_ISTKolNow |
|
from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS |
|
from FileStream.Exceptions import FIleNotFound, InvalidHash |
|
from FileStream.utils.FileProcessors.custom_ul import TeleUploader |
|
|
|
|
|
async def media_streamer(request: web.Request, db_id: str, speed: str): |
|
|
|
range_header = request.headers.get("Range", 0) |
|
client = await req_client() |
|
|
|
if Telegram.MULTI_CLIENT: |
|
logging.info(f"Client {client['index']} is now serving {request.headers.get('X-FORWARDED-FOR', request.remote)}") |
|
|
|
|
|
tg_connect = ACTIVE_CLIENTS.get(client['client'], None) |
|
|
|
if tg_connect is None: |
|
logging.debug(f"Creating new ByteStreamer object for client {client['index']}") |
|
tg_connect = utils.ByteStreamer(client['client']) |
|
ACTIVE_CLIENTS[client['client']] = tg_connect |
|
|
|
else: |
|
tg_connect.update_last_activity() |
|
logging.debug(f"Using cached ByteStreamer object for client {client['index']}") |
|
|
|
try: |
|
|
|
logging.debug("Fetching file properties") |
|
file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS) |
|
file_size = file_id.file_size |
|
|
|
|
|
from_bytes, until_bytes = parse_range(range_header, file_size) |
|
|
|
|
|
if from_bytes is None or until_bytes is None: |
|
return web.Response( |
|
status=416, |
|
body="416: Range not satisfiable", |
|
headers={"Content-Range": f"bytes */{file_size}"}, |
|
) |
|
|
|
|
|
chunk_size = 1024 * 1024 if speed == "FAST" else 512 * 1024 |
|
|
|
|
|
until_bytes = min(until_bytes, file_size - 1) |
|
|
|
|
|
offset, first_part_cut, last_part_cut, part_count = compute_offsets(from_bytes, until_bytes, chunk_size) |
|
|
|
|
|
body = tg_connect.yield_file( |
|
file_id, client['index'], offset, first_part_cut, last_part_cut, part_count, chunk_size |
|
) |
|
|
|
|
|
mime_type = file_id.mime_type or mimetypes.guess_type(file_id.file_name)[0] or "application/octet-stream" |
|
file_name = utils.get_name(file_id) |
|
disposition = "attachment" |
|
|
|
|
|
req_length = until_bytes - from_bytes + 1 |
|
return web.Response( |
|
status=206 if range_header else 200, |
|
body=body, |
|
headers={ |
|
"Content-Type": mime_type, |
|
"Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}", |
|
"Content-Length": str(req_length), |
|
"Content-Disposition": f'{disposition}; filename="{file_name}"', |
|
"Accept-Ranges": "bytes", |
|
}, |
|
) |
|
except Exception as e: |
|
logging.error(f"Error in media_streamer: {traceback.format_exc()}") |
|
raise web.HTTPInternalServerError() |
|
|
|
|
|
def parse_range(range_header: str, file_size: int): |
|
"""Helper function to parse the range header.""" |
|
try: |
|
range_str = range_header.replace("bytes=", "") |
|
from_bytes, until_bytes = range_str.split("-") |
|
from_bytes = int(from_bytes) |
|
until_bytes = int(until_bytes) if until_bytes else file_size - 1 |
|
return from_bytes, until_bytes |
|
except ValueError: |
|
return None, None |
|
|
|
|
|
def compute_offsets(from_bytes: int, until_bytes: int, chunk_size: int): |
|
"""Compute the offsets, cuts, and part counts for file chunking.""" |
|
offset = from_bytes - (from_bytes % chunk_size) |
|
first_part_cut = from_bytes - offset |
|
last_part_cut = until_bytes % chunk_size + 1 |
|
part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size) |
|
return offset, first_part_cut, last_part_cut, part_count |
|
|