File size: 5,282 Bytes
e566133 2a429c7 faa9726 e566133 4d957e1 e566133 2a429c7 d7a5ee1 faa9726 e566133 a88b8c8 59e7379 51cfe77 5dac619 a88b8c8 fdbc27b 5dac619 81b9358 5dac619 a88b8c8 8fde8ae 4718e29 a88b8c8 51cfe77 5dac619 08193bb a88b8c8 51cfe77 5dac619 8fde8ae 5dac619 4718e29 a88b8c8 59e7379 a88b8c8 59e7379 a88b8c8 51cfe77 a88b8c8 59e7379 c29dc6c a88b8c8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
import os
import time
import math
import logging
import asyncio
import traceback
from aiohttp import web
from pyrogram import raw
from aiohttp.http_exceptions import BadStatusLine
#---------------------Local Upload---------------------#
from FileStream.config import Telegram
from FileStream.bot import req_client, FileStream
from FileStream import utils, StartTime, __version__
from FileStream.Tools import mime_identifier, Time_ISTKolNow
from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
from FileStream.Exceptions import FileNotFound, InvalidHash
from FileStream.utils.FileProcessors.custom_ul import TeleUploader
async def media_streamer(request: web.Request, db_id: str, speed: str):
# Get the Range header from the request, default to 0 if not present
range_header = request.headers.get("Range", 0)
Worker = await req_client(request.remote)
client= Worker.get("client")
# Log client info if multi-client mode
#if Telegram.MULTI_CLIENT:
#print("ACTIVE Client",ACTIVE_CLIENTS)
#logging.info(f"Client {Worker['index']} is now serving {request.headers.get('X-FORWARDED-FOR', request.remote)}")
# Use an existing ByteStreamer or create a new one
#tg_connect = ACTIVE_CLIENTS.get(client['client'], None)
if client in ACTIVE_CLIENTS:
logging.debug(f"Cached ByteStreamer object for client {Worker['index']} # Serving Destination :{request.headers.get('X-FORWARDED-FOR', request.remote)}")
tg_connect = ACTIVE_CLIENTS[client]
else:
tg_connect = utils.ByteStreamer(client)
ACTIVE_CLIENTS[client] = tg_connect
logging.debug(f"New ByteStreamer object for client {Worker['index']} # Serving Destination :{request.headers.get('X-FORWARDED-FOR', request.remote)}")
logging.info(f"Client :{Worker['index']} # Serving Destination : {request.headers.get('X-FORWARDED-FOR', request.remote)}")
tg_connect.update_last_activity()
try:
# Fetch file properties once and use it throughout
logging.debug("Fetching file properties")
file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS)
file_size = file_id.file_size
# Parse range header efficiently
from_bytes, until_bytes = parse_range(range_header, file_size)
# If range is invalid, return a 416 error
if (until_bytes > file_size) or (from_bytes < 0) or (until_bytes < from_bytes):
return web.Response(
status=416,
body="416: Range not satisfiable",
headers={"Content-Range": f"bytes */{file_size}"},
)
# Set chunk size based on speed
chunk_size = 1024 * 1024 if speed == "FAST" else 512 * 1024
# Ensure we don't go past the file size
until_bytes = min(until_bytes, file_size - 1)
# Compute offset and range parts
offset, first_part_cut, last_part_cut, part_count = compute_offsets(from_bytes, until_bytes, chunk_size)
# Request the file chunks
body = tg_connect.yield_file(
file_id, Worker['index'], offset, first_part_cut, last_part_cut, part_count, chunk_size
)
# Determine MIME type and filename
mime_type = file_id.mime_type or mimetypes.guess_type(file_id.file_name)[0] or "application/octet-stream"
file_name = utils.get_name(file_id)
disposition = "attachment"
# Return the response with proper headers and status
req_length = until_bytes - from_bytes + 1
return web.Response(
status=206 if range_header else 200,
body=body,
headers={
"Content-Type": mime_type,
"Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}",
"Content-Length": str(req_length),
"Content-Disposition": f'{disposition}; filename="{file_name}"',
"Accept-Ranges": "bytes",
},
)
except Exception as e:
logging.error(f"Error in media_streamer: {traceback.format_exc()}")
raise web.HTTPInternalServerError() # Re-raise the exception as a server error
def parse_range(range_header: str, file_size: int):
"""Helper function to parse the range header."""
try:
if range_header:
from_bytes, until_bytes = range_header.replace("bytes=", "").split("-")
from_bytes = int(from_bytes)
until_bytes = int(until_bytes) if until_bytes else file_size - 1
else:
from_bytes = 0
until_bytes = file_size - 1
#request :web.Request ,
#from_bytes = request.http_range.start or 0
#until_bytes = (request.http_range.stop or file_size) - 1
return from_bytes, until_bytes
except ValueError:
return None, None
def compute_offsets(from_bytes: int, until_bytes: int, chunk_size: int):
"""Compute the offsets, cuts, and part counts for file chunking."""
offset = from_bytes - (from_bytes % chunk_size)
first_part_cut = from_bytes - offset
last_part_cut = until_bytes % chunk_size + 1
part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size)
return offset, first_part_cut, last_part_cut, part_count
|