privateone commited on
Commit
2a429c7
·
1 Parent(s): e3d3fc0

Code Updates & # Optimisations Clearing UP ACTIVE_CLIENTS

Browse files
FileStream/server/Functions/downloader.py CHANGED
@@ -1,8 +1,8 @@
1
  import os
2
  import time
3
  import math
4
- import logging
5
  import asyncio
 
6
  import traceback
7
  from aiohttp import web
8
  from pyrogram import raw
@@ -14,14 +14,17 @@ from FileStream.config import Telegram
14
  from FileStream.bot import req_client, FileStream
15
  from FileStream import utils, StartTime, __version__
16
  from FileStream.Tools import mime_identifier, Time_ISTKolNow
17
- from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
18
  from FileStream.server.exceptions import FIleNotFound, InvalidHash
 
19
  from FileStream.server.render_template import render_page, render_upload
20
- from FileStream.utils.FileProcessors.custom_ul import TeleUploader
21
 
22
 
23
  async def media_streamer(request: web.Request, db_id: str, speed: str):
24
- # Get the Range header from the request, default to 0 if not present
 
 
25
  range_header = request.headers.get("Range", "bytes=0-")
26
  client = await req_client()
27
 
@@ -29,20 +32,23 @@ async def media_streamer(request: web.Request, db_id: str, speed: str):
29
  if Telegram.MULTI_CLIENT:
30
  logging.info(f"Client {client['index']} is now serving {request.headers.get('X-FORWARDED-FOR', request.remote)}")
31
 
32
- # Use an existing ByteStreamer or create a new one
33
  tg_connect = ACTIVE_CLIENTS.get(client['client'], None)
34
 
35
  if tg_connect is None:
36
  logging.debug(f"Creating new ByteStreamer object for client {client['index']}")
37
  tg_connect = utils.ByteStreamer(client['client'])
38
- ACTIVE_CLIENTS[client['client']] = tg_connect
 
 
 
39
  else:
40
  logging.debug(f"Using cached ByteStreamer object for client {client['index']}")
41
 
42
  try:
43
  # Fetch file properties once and use it throughout
44
  logging.debug("Fetching file properties")
45
- file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS)
46
  file_size = file_id.file_size
47
 
48
  # Parse range header efficiently
@@ -57,7 +63,7 @@ async def media_streamer(request: web.Request, db_id: str, speed: str):
57
  )
58
 
59
  # Set chunk size based on speed
60
- chunk_size = 1024 * 1024 if speed == "FAST" else 512 * 1024
61
 
62
  # Ensure we don't go past the file size
63
  until_bytes = min(until_bytes, file_size - 1)
@@ -66,7 +72,7 @@ async def media_streamer(request: web.Request, db_id: str, speed: str):
66
  offset, first_part_cut, last_part_cut, part_count = compute_offsets(from_bytes, until_bytes, chunk_size)
67
 
68
  # Request the file chunks
69
- body = tg_connect.yield_file(
70
  file_id, client['index'], offset, first_part_cut, last_part_cut, part_count, chunk_size
71
  )
72
 
 
1
  import os
2
  import time
3
  import math
 
4
  import asyncio
5
+ import logging
6
  import traceback
7
  from aiohttp import web
8
  from pyrogram import raw
 
14
  from FileStream.bot import req_client, FileStream
15
  from FileStream import utils, StartTime, __version__
16
  from FileStream.Tools import mime_identifier, Time_ISTKolNow
17
+ from FileStream.utils.FileProcessors.custom_ul import TeleUploader
18
  from FileStream.server.exceptions import FIleNotFound, InvalidHash
19
+ from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
20
  from FileStream.server.render_template import render_page, render_upload
21
+
22
 
23
 
24
  async def media_streamer(request: web.Request, db_id: str, speed: str):
25
+ """
26
+ Handle streaming of media files to the client.
27
+ """
28
  range_header = request.headers.get("Range", "bytes=0-")
29
  client = await req_client()
30
 
 
32
  if Telegram.MULTI_CLIENT:
33
  logging.info(f"Client {client['index']} is now serving {request.headers.get('X-FORWARDED-FOR', request.remote)}")
34
 
35
+ # Get or create the ByteStreamer object
36
  tg_connect = ACTIVE_CLIENTS.get(client['client'], None)
37
 
38
  if tg_connect is None:
39
  logging.debug(f"Creating new ByteStreamer object for client {client['index']}")
40
  tg_connect = utils.ByteStreamer(client['client'])
41
+ ACTIVE_CLIENTS[client['client']] = {
42
+ "last_activity": asyncio.get_event_loop().time(), # Store current timestamp
43
+ "streamer": tg_connect
44
+ }
45
  else:
46
  logging.debug(f"Using cached ByteStreamer object for client {client['index']}")
47
 
48
  try:
49
  # Fetch file properties once and use it throughout
50
  logging.debug("Fetching file properties")
51
+ file_id = await tg_connect["streamer"].get_file_properties(db_id, MULTI_CLIENTS)
52
  file_size = file_id.file_size
53
 
54
  # Parse range header efficiently
 
63
  )
64
 
65
  # Set chunk size based on speed
66
+ chunk_size = 1024 * 1024 if speed == "FAST" else 512 * 1024
67
 
68
  # Ensure we don't go past the file size
69
  until_bytes = min(until_bytes, file_size - 1)
 
72
  offset, first_part_cut, last_part_cut, part_count = compute_offsets(from_bytes, until_bytes, chunk_size)
73
 
74
  # Request the file chunks
75
+ body = tg_connect["streamer"].yield_file(
76
  file_id, client['index'], offset, first_part_cut, last_part_cut, part_count, chunk_size
77
  )
78
 
FileStream/server/__init__.py CHANGED
@@ -10,26 +10,23 @@ from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
10
 
11
 
12
  # Set time to consider a client inactive (e.g., 10 minutes)
13
- INACTIVITY_TIMEOUT = 600 # 10 minutes
14
 
15
  async def clear_inactive_clients():
16
- """
17
- Periodically checks and clears inactive clients from ACTIVE_CLIENTS.
18
- """
19
  while True:
20
- await asyncio.sleep(INACTIVITY_TIMEOUT) # Check every 10 minutes
21
 
22
- # Find clients that are inactive and clear them
23
- now = asyncio.get_event_loop().time()
24
  inactive_clients = [
25
- client for client, last_activity in ACTIVE_CLIENTS.items()
26
- if now - last_activity > INACTIVITY_TIMEOUT
27
  ]
28
 
29
  for client in inactive_clients:
30
  # Log and clear the inactive client
31
  logging.info(f"Clearing inactive client: {client}")
32
- del ACTIVE_CLIENTS[client] # Clear the client
33
 
34
 
35
  async def root_route_handler(request):
 
10
 
11
 
12
  # Set time to consider a client inactive (e.g., 10 minutes)
13
+ INACTIVITY_TIMEOUT = 240 # 4 minutes
14
 
15
  async def clear_inactive_clients():
16
+ """Clear inactive clients from ACTIVE_CLIENTS."""
 
 
17
  while True:
18
+ await asyncio.sleep(INACTIVITY_TIMEOUT) # Check every INACTIVITY_TIMEOUT seconds
19
 
20
+ now = asyncio.get_event_loop().time() # Get current time
 
21
  inactive_clients = [
22
+ client for client, data in ACTIVE_CLIENTS.items()
23
+ if now - data["last_activity"] > INACTIVITY_TIMEOUT # Compare with last_activity timestamp
24
  ]
25
 
26
  for client in inactive_clients:
27
  # Log and clear the inactive client
28
  logging.info(f"Clearing inactive client: {client}")
29
+ del ACTIVE_CLIENTS[client] # Remove inactive client from ACTIVE_CLIENTS
30
 
31
 
32
  async def root_route_handler(request):