File size: 4,597 Bytes
d0b286c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import os
import time
import math
import logging
import asyncio
import traceback
from aiohttp import web
from pyrogram import raw
from aiohttp.http_exceptions import BadStatusLine

#---------------------Local Upload---------------------#

from FileStream.config import Telegram
from FileStream.bot import req_client, FileStream
from FileStream import utils, StartTime, __version__
from FileStream.Tools import mime_identifier, Time_ISTKolNow
from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
from FileStream.Exceptions import FIleNotFound, InvalidHash
from FileStream.utils.FileProcessors.custom_ul import  TeleUploader


async def media_streamer(request: web.Request, db_id: str, speed: str):
    # Get the Range header from the request, default to 0 if not present
    range_header = request.headers.get("Range", 0)
    client = await req_client()
    # Log client info if multi-client mode
    if Telegram.MULTI_CLIENT:
        logging.info(f"Client {client['index']} is now serving {request.headers.get('X-FORWARDED-FOR', request.remote)}")

    # Use an existing ByteStreamer or create a new one
    tg_connect = ACTIVE_CLIENTS.get(client['client'], None)
    
    if tg_connect is None:
        logging.debug(f"Creating new ByteStreamer object for client {client['index']}")
        tg_connect = utils.ByteStreamer(client['client'])
        ACTIVE_CLIENTS[client['client']] = tg_connect

    else:
        tg_connect.update_last_activity()
        logging.debug(f"Using cached ByteStreamer object for client {client['index']}")

    try:
        # Fetch file properties once and use it throughout
        logging.debug("Fetching file properties")
        file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS)
        file_size = file_id.file_size

        # Parse range header efficiently
        from_bytes, until_bytes = parse_range(range_header, file_size)

        # If range is invalid, return a 416 error
        if from_bytes is None or until_bytes is None:
            return web.Response(
                status=416,
                body="416: Range not satisfiable",
                headers={"Content-Range": f"bytes */{file_size}"},
            )

        # Set chunk size based on speed
        chunk_size =  1024 * 1024 if speed == "FAST" else 512 * 1024

        # Ensure we don't go past the file size
        until_bytes = min(until_bytes, file_size - 1)

        # Compute offset and range parts
        offset, first_part_cut, last_part_cut, part_count = compute_offsets(from_bytes, until_bytes, chunk_size)

        # Request the file chunks
        body = tg_connect.yield_file(
            file_id, client['index'], offset, first_part_cut, last_part_cut, part_count, chunk_size
        )

        # Determine MIME type and filename
        mime_type = file_id.mime_type or mimetypes.guess_type(file_id.file_name)[0] or "application/octet-stream"
        file_name = utils.get_name(file_id)
        disposition = "attachment"

        # Return the response with proper headers and status
        req_length = until_bytes - from_bytes + 1
        return web.Response(
            status=206 if range_header else 200,
            body=body,
            headers={
                "Content-Type": mime_type,
                "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}",
                "Content-Length": str(req_length),
                "Content-Disposition": f'{disposition}; filename="{file_name}"',
                "Accept-Ranges": "bytes",
            },
        )
    except Exception as e:
        logging.error(f"Error in media_streamer: {traceback.format_exc()}")
        raise web.HTTPInternalServerError()  # Re-raise the exception as a server error


def parse_range(range_header: str, file_size: int):
    """Helper function to parse the range header."""
    try:
        range_str = range_header.replace("bytes=", "")
        from_bytes, until_bytes = range_str.split("-")
        from_bytes = int(from_bytes)
        until_bytes = int(until_bytes) if until_bytes else file_size - 1
        return from_bytes, until_bytes
    except ValueError:
        return None, None


def compute_offsets(from_bytes: int, until_bytes: int, chunk_size: int):
    """Compute the offsets, cuts, and part counts for file chunking."""
    offset = from_bytes - (from_bytes % chunk_size)
    first_part_cut = from_bytes - offset
    last_part_cut = until_bytes % chunk_size + 1
    part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size)
    return offset, first_part_cut, last_part_cut, part_count