File size: 4,802 Bytes
63bf853
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53b6652
63bf853
 
 
 
 
 
8fde8ae
 
 
63bf853
 
 
8fde8ae
 
63bf853
8fde8ae
 
 
63bf853
8fde8ae
 
 
63bf853
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8fde8ae
63bf853
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8fde8ae
63bf853
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import os
import time
import math
import logging
import asyncio
import traceback
from aiohttp import web
from pyrogram import raw
from aiohttp.http_exceptions import BadStatusLine

#---------------------Local Upload---------------------#

from FileStream.config import Telegram
from FileStream.bot import req_client, FileStream
from FileStream import utils, StartTime, __version__
from FileStream.Tools import mime_identifier, Time_ISTKolNow
from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
from FileStream.Exceptions import FileNotFound, InvalidHash
from FileStream.utils.FileProcessors.custom_ul import  TeleUploader


async def media_streamer(request: web.Request, db_id: str, speed: str):
    # Get the Range header from the request, default to 0 if not present
    range_header = request.headers.get("Range", 0)

    index = min(WORK_LOADS, key=WORK_LOADS.get)
    faster_client = MULTI_CLIENTS[index]
    # Log client info if multi-client mode

    
    if Telegram.MULTI_CLIENT:
        logging.info(f"Client {index} is now serving {request.headers.get('X-FORWARDED-FOR', request.remote)}")

    if faster_client in ACTIVE_CLIENTS:
        tg_connect = ACTIVE_CLIENTS[faster_client]
        logging.debug(f"Using cached ByteStreamer object for client {index}")
    else:
        logging.debug(f"Creating new ByteStreamer object forclient {index}")
        tg_connect = utils.ByteStreamer(faster_client)
        ACTIVE_CLIENTS[faster_client] = tg_connect
    try:
        # Fetch file properties once and use it throughout
        logging.debug("Fetching file properties")
        file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS)
        file_size = file_id.file_size

        # Parse range header efficiently
        from_bytes, until_bytes = parse_range(range_header, file_size)

        # If range is invalid, return a 416 error
        if (until_bytes > file_size) or (from_bytes < 0) or (until_bytes < from_bytes):
            return web.Response(
                status=416,
                body="416: Range not satisfiable",
                headers={"Content-Range": f"bytes */{file_size}"},
            )

        # Set chunk size based on speed
        chunk_size =  1024 * 1024 if speed == "FAST" else 512 * 1024


        # Ensure we don't go past the file size
        until_bytes = min(until_bytes, file_size - 1)

        # Compute offset and range parts
        offset, first_part_cut, last_part_cut, part_count = compute_offsets(from_bytes, until_bytes, chunk_size)

        # Request the file chunks
        body = tg_connect.yield_file(
            file_id, index, offset, first_part_cut, last_part_cut, part_count, chunk_size
        )

        # Determine MIME type and filename
        mime_type = file_id.mime_type or mimetypes.guess_type(file_id.file_name)[0] or "application/octet-stream"
        file_name = utils.get_name(file_id)
        disposition = "attachment"

        # Return the response with proper headers and status
        req_length = until_bytes - from_bytes + 1
        return web.Response(
            status=206 if range_header else 200,
            body=body,
            headers={
                "Content-Type": mime_type,
                "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}",
                "Content-Length": str(req_length),
                "Content-Disposition": f'{disposition}; filename="{file_name}"',
                "Accept-Ranges": "bytes",
            },
        )
    except Exception as e:
        logging.error(f"Error in media_streamer: {traceback.format_exc()}")
        raise web.HTTPInternalServerError()  # Re-raise the exception as a server error


def parse_range(range_header: str, file_size: int):
    """Helper function to parse the range header."""
    try:
        if range_header:
            from_bytes, until_bytes = range_header.replace("bytes=", "").split("-")
            from_bytes = int(from_bytes)
            until_bytes = int(until_bytes) if until_bytes else file_size - 1
        else:
            from_bytes =  0
            until_bytes =  file_size - 1
            #request :web.Request ,
            #from_bytes = request.http_range.start or 0
            #until_bytes = (request.http_range.stop or file_size) - 1
        return from_bytes, until_bytes

    except ValueError:
        return None, None


def compute_offsets(from_bytes: int, until_bytes: int, chunk_size: int):
    """Compute the offsets, cuts, and part counts for file chunking."""
    offset = from_bytes - (from_bytes % chunk_size)
    first_part_cut = from_bytes - offset
    last_part_cut = until_bytes % chunk_size + 1
    part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size)
    return offset, first_part_cut, last_part_cut, part_count