File size: 5,282 Bytes
e566133
 
 
2a429c7
faa9726
e566133
 
 
 
 
 
 
 
4d957e1
e566133
 
2a429c7
d7a5ee1
faa9726
e566133
 
a88b8c8
 
59e7379
51cfe77
5dac619
a88b8c8
fdbc27b
5dac619
81b9358
5dac619
a88b8c8
8fde8ae
4718e29
a88b8c8
51cfe77
5dac619
08193bb
a88b8c8
 
51cfe77
 
5dac619
8fde8ae
5dac619
4718e29
 
a88b8c8
 
 
 
 
 
 
 
 
 
59e7379
a88b8c8
 
 
 
 
 
 
 
59e7379
a88b8c8
 
 
 
 
 
 
 
 
51cfe77
a88b8c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59e7379
 
 
 
 
c29dc6c
 
 
 
 
a88b8c8
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import os
import time
import math
import logging
import asyncio
import traceback
from aiohttp import web
from pyrogram import raw
from aiohttp.http_exceptions import BadStatusLine

#---------------------Local Upload---------------------#

from FileStream.config import Telegram
from FileStream.bot import req_client, FileStream
from FileStream import utils, StartTime, __version__
from FileStream.Tools import mime_identifier, Time_ISTKolNow
from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
from FileStream.Exceptions import FileNotFound, InvalidHash
from FileStream.utils.FileProcessors.custom_ul import  TeleUploader


async def media_streamer(request: web.Request, db_id: str, speed: str):
    # Get the Range header from the request, default to 0 if not present
    range_header = request.headers.get("Range", 0)
    Worker = await req_client(request.remote)
    client= Worker.get("client")
    # Log client info if multi-client mode

    #if Telegram.MULTI_CLIENT:
        #print("ACTIVE Client",ACTIVE_CLIENTS)
        #logging.info(f"Client {Worker['index']} is now serving {request.headers.get('X-FORWARDED-FOR', request.remote)}")

    # Use an existing ByteStreamer or create a new one
    #tg_connect = ACTIVE_CLIENTS.get(client['client'], None)
    
    if client in ACTIVE_CLIENTS:
        logging.debug(f"Cached ByteStreamer object for client {Worker['index']} # Serving Destination :{request.headers.get('X-FORWARDED-FOR', request.remote)}")
        tg_connect = ACTIVE_CLIENTS[client]

    else:
        tg_connect = utils.ByteStreamer(client)
        ACTIVE_CLIENTS[client] = tg_connect
        logging.debug(f"New ByteStreamer object for client {Worker['index']} # Serving Destination :{request.headers.get('X-FORWARDED-FOR', request.remote)}")

    logging.info(f"Client :{Worker['index']} # Serving Destination : {request.headers.get('X-FORWARDED-FOR', request.remote)}")
    tg_connect.update_last_activity()

    try:
        # Fetch file properties once and use it throughout
        logging.debug("Fetching file properties")
        file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS)
        file_size = file_id.file_size

        # Parse range header efficiently
        from_bytes, until_bytes = parse_range(range_header, file_size)

        # If range is invalid, return a 416 error
        if (until_bytes > file_size) or (from_bytes < 0) or (until_bytes < from_bytes):
            return web.Response(
                status=416,
                body="416: Range not satisfiable",
                headers={"Content-Range": f"bytes */{file_size}"},
            )

        # Set chunk size based on speed
        chunk_size =  1024 * 1024 if speed == "FAST" else 512 * 1024


        # Ensure we don't go past the file size
        until_bytes = min(until_bytes, file_size - 1)

        # Compute offset and range parts
        offset, first_part_cut, last_part_cut, part_count = compute_offsets(from_bytes, until_bytes, chunk_size)

        # Request the file chunks
        body = tg_connect.yield_file(
            file_id, Worker['index'], offset, first_part_cut, last_part_cut, part_count, chunk_size
        )

        # Determine MIME type and filename
        mime_type = file_id.mime_type or mimetypes.guess_type(file_id.file_name)[0] or "application/octet-stream"
        file_name = utils.get_name(file_id)
        disposition = "attachment"

        # Return the response with proper headers and status
        req_length = until_bytes - from_bytes + 1
        return web.Response(
            status=206 if range_header else 200,
            body=body,
            headers={
                "Content-Type": mime_type,
                "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}",
                "Content-Length": str(req_length),
                "Content-Disposition": f'{disposition}; filename="{file_name}"',
                "Accept-Ranges": "bytes",
            },
        )
    except Exception as e:
        logging.error(f"Error in media_streamer: {traceback.format_exc()}")
        raise web.HTTPInternalServerError()  # Re-raise the exception as a server error


def parse_range(range_header: str, file_size: int):
    """Helper function to parse the range header."""
    try:
        if range_header:
            from_bytes, until_bytes = range_header.replace("bytes=", "").split("-")
            from_bytes = int(from_bytes)
            until_bytes = int(until_bytes) if until_bytes else file_size - 1
        else:
            from_bytes =  0
            until_bytes =  file_size - 1
            #request :web.Request ,
            #from_bytes = request.http_range.start or 0
            #until_bytes = (request.http_range.stop or file_size) - 1
        return from_bytes, until_bytes
    except ValueError:
        return None, None


def compute_offsets(from_bytes: int, until_bytes: int, chunk_size: int):
    """Compute the offsets, cuts, and part counts for file chunking."""
    offset = from_bytes - (from_bytes % chunk_size)
    first_part_cut = from_bytes - offset
    last_part_cut = until_bytes % chunk_size + 1
    part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size)
    return offset, first_part_cut, last_part_cut, part_count