privateone commited on
Commit
a88b8c8
·
1 Parent(s): 0d06489

Code Updates & Optimisations :Server Upgrade Test

Browse files
FileStream/server/Functions/downloader.py CHANGED
@@ -19,80 +19,97 @@ from FileStream.Exceptions import FIleNotFound, InvalidHash
19
  from FileStream.utils.FileProcessors.custom_ul import TeleUploader
20
 
21
 
22
- async def media_streamer(request: web.Request, db_id: str, speed:str):
23
-
24
- range_header = request.headers.get("Range", 0)
25
- #index = minWORK_LOADS, keyWORK_LOADS.get)
26
- #faster_client = MULTI_CLIENTS[index]
27
-
28
- client = await req_client()
29
-
30
- if Telegram.MULTI_CLIENT:
31
- logging.info(f"Client {client['index']} is now serving {request.headers.get('X-FORWARDED-FOR',request.remote)}")
32
-
33
- if client['client'] in ACTIVE_CLIENTS:
34
- tg_connect = ACTIVE_CLIENTS[client['client']]
35
- logging.debug(f"Using cached ByteStreamer object for client {client['index']}")
36
-
37
- else:
38
- logging.debug(f"Creating new ByteStreamer object for client {client['index']}")
39
- tg_connect = utils.ByteStreamer(client['client'])
40
- ACTIVE_CLIENTS[client['client']] = tg_connect
41
-
42
- logging.debug("before calling get_file_properties")
43
- file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS)
44
- logging.debug("after calling get_file_properties")
45
-
46
- file_size = file_id.file_size
47
-
48
- if range_header:
49
- from_bytes, until_bytes = range_header.replace("bytes=", "").split("-")
50
- from_bytes = int(from_bytes)
51
- until_bytes = int(until_bytes) if until_bytes else file_size - 1
52
- else:
53
- from_bytes = request.http_range.start or 0
54
- until_bytes = (request.http_range.stop or file_size) - 1
55
-
56
- if (until_bytes > file_size) or (from_bytes < 0) or (until_bytes < from_bytes):
57
-
58
- return web.Response(
59
- status=416,
60
- body="416: Range not satisfiable",
61
- headers={"Content-Range": f"bytes */{file_size}"},
62
- )
63
-
64
- chunk_size = 4 * 1024 * 1024 if speed == "FAST" else 512 * 1024
65
-
66
- until_bytes = min(until_bytes, file_size - 1)
67
-
68
- offset = from_bytes - (from_bytes % chunk_size)
69
- first_part_cut = from_bytes - offset
70
- last_part_cut = until_bytes % chunk_size + 1
71
-
72
- req_length = until_bytes - from_bytes + 1
73
- part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size)
74
-
75
- body = tg_connect.yield_file(file_id, client['index'], offset, first_part_cut,last_part_cut, part_count, chunk_size)
76
-
77
- mime_type = file_id.mime_type
78
- file_name = utils.get_name(file_id)
79
- disposition = "attachment"
80
-
81
- if not mime_type:
82
- mime_type = mimetypes.guess_type(file_name)[0] or "application/octet-stream"
83
-
84
- # if "video/" in mime_type or "audio/" in mime_type:
85
- # disposition = "inline"
86
-
87
- return web.Response(
88
- status=206 if range_header else 200,
89
- body=body,
90
- headers={
91
- "Content-Type": f"{mime_type}",
92
- "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}",
93
- "Content-Length": str(req_length),
94
- "Content-Disposition": f'{disposition}; filename="{file_name}"',
95
- "Accept-Ranges": "bytes",
96
- },
97
- )
98
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  from FileStream.utils.FileProcessors.custom_ul import TeleUploader
20
 
21
 
22
+ async def media_streamer(request: web.Request, db_id: str, speed: str):
23
+ # Get the Range header from the request, default to 0 if not present
24
+ range_header = request.headers.get("Range", "0")
25
+ client = await req_client()
26
+ # Log client info if multi-client mode
27
+ if Telegram.MULTI_CLIENT:
28
+ logging.info(f"Client {client['index']} is now serving {request.headers.get('X-FORWARDED-FOR', request.remote)}")
29
+
30
+ # Use an existing ByteStreamer or create a new one
31
+ tg_connect = ACTIVE_CLIENTS.get(client['client'], None)
32
+
33
+ if tg_connect is None:
34
+ logging.debug(f"Creating new ByteStreamer object for client {client['index']}")
35
+ tg_connect = utils.ByteStreamer(client['client'])
36
+ ACTIVE_CLIENTS[client['client']] = tg_connect
37
+
38
+ else:
39
+ tg_connect.update_last_activity()
40
+ logging.debug(f"Using cached ByteStreamer object for client {client['index']}")
41
+
42
+ try:
43
+ # Fetch file properties once and use it throughout
44
+ logging.debug("Fetching file properties")
45
+ file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS)
46
+ file_size = file_id.file_size
47
+
48
+ # Parse range header efficiently
49
+ from_bytes, until_bytes = parse_range(range_header, file_size)
50
+
51
+ # If range is invalid, return a 416 error
52
+ if from_bytes is None or until_bytes is None:
53
+ return web.Response(
54
+ status=416,
55
+ body="416: Range not satisfiable",
56
+ headers={"Content-Range": f"bytes */{file_size}"},
57
+ )
58
+
59
+ # Set chunk size based on speed
60
+ chunk_size = 1024 * 1024 if speed == "FAST" else 512 * 1024
61
+
62
+
63
+ # Ensure we don't go past the file size
64
+ until_bytes = min(until_bytes, file_size - 1)
65
+
66
+ # Compute offset and range parts
67
+ offset, first_part_cut, last_part_cut, part_count = compute_offsets(from_bytes, until_bytes, chunk_size)
68
+
69
+ # Request the file chunks
70
+ body = tg_connect.yield_file(
71
+ file_id, client['index'], offset, first_part_cut, last_part_cut, part_count, chunk_size
72
+ )
73
+
74
+ # Determine MIME type and filename
75
+ mime_type = file_id.mime_type or mimetypes.guess_type(file_id.file_name)[0] or "application/octet-stream"
76
+ file_name = utils.get_name(file_id)
77
+ disposition = "attachment"
78
+
79
+ # Return the response with proper headers and status
80
+ req_length = until_bytes - from_bytes + 1
81
+ return web.Response(
82
+ status=206 if range_header else 200,
83
+ body=body,
84
+ headers={
85
+ "Content-Type": mime_type,
86
+ "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}",
87
+ "Content-Length": str(req_length),
88
+ "Content-Disposition": f'{disposition}; filename="{file_name}"',
89
+ "Accept-Ranges": "bytes",
90
+ },
91
+ )
92
+ except Exception as e:
93
+ logging.error(f"Error in media_streamer: {traceback.format_exc()}")
94
+ raise web.HTTPInternalServerError() # Re-raise the exception as a server error
95
+
96
+
97
+ def parse_range(range_header: str, file_size: int):
98
+ """Helper function to parse the range header."""
99
+ try:
100
+ range_str = range_header.replace("bytes=", "")
101
+ from_bytes, until_bytes = range_str.split("-")
102
+ from_bytes = int(from_bytes)
103
+ until_bytes = int(until_bytes) if until_bytes else file_size - 1
104
+ return from_bytes, until_bytes
105
+ except ValueError:
106
+ return None, None
107
+
108
+
109
+ def compute_offsets(from_bytes: int, until_bytes: int, chunk_size: int):
110
+ """Compute the offsets, cuts, and part counts for file chunking."""
111
+ offset = from_bytes - (from_bytes % chunk_size)
112
+ first_part_cut = from_bytes - offset
113
+ last_part_cut = until_bytes % chunk_size + 1
114
+ part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size)
115
+ return offset, first_part_cut, last_part_cut, part_count
FileStream/utils/FileProcessors/custom_dl.py CHANGED
@@ -23,200 +23,210 @@ from FileStream.Exceptions import FIleNotFound, InvalidHash
23
  from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
24
 
25
 
 
26
  class ByteStreamer:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
- def __init__(self, client: Client):
29
- self.clean_timer = 30 * 60
30
- self.client: Client = client
31
- self.cached_file_ids: Dict[str, FileId] = {}
32
- asyncio.create_task(self.clean_cache())
33
-
34
- async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
35
- """
36
- Returns the properties of a media of a specific message in a FIleId class.
37
- if the properties are cached, then it'll return the cached results.
38
- or it'll generate the properties from the Message ID and cache them.
39
  """
40
- if not db_id in self.cached_file_ids:
41
- logging.debug("Before Calling generate_file_properties")
42
- await self.generate_file_properties(db_id, MULTI_CLIENTS)
43
- logging.debug(f"Cached file properties for file with ID {db_id}")
44
- return self.cached_file_ids[db_id]
45
-
46
- async def generate_file_properties(self, db_id: str,
47
- MULTI_CLIENTS) -> FileId:
48
- """
49
  Generates the properties of a media file on a specific message.
50
- returns ths properties in a FIleId class.
 
 
 
 
 
 
 
 
 
51
  """
52
- logging.debug("Before calling get_file_ids")
53
- file_id = await get_file_ids(self.client, db_id, Message)
54
- logging.debug(f"Generated file ID and Unique ID for file with ID {db_id}")
55
- self.cached_file_ids[db_id] = file_id
56
- logging.debug(f"Cached media file with ID {db_id}")
57
- return self.cached_file_ids[db_id]
58
-
59
- async def generate_media_session(self, client: Client,
60
- file_id: FileId) -> Session:
61
- """
62
  Generates the media session for the DC that contains the media file.
63
  This is required for getting the bytes from Telegram servers.
64
  """
65
-
66
- media_session = client.media_sessions.get(file_id.dc_id, None)
67
-
68
- if media_session is None:
69
- if file_id.dc_id != await client.storage.dc_id():
70
- media_session = Session(
71
- client,
72
- file_id.dc_id,
73
- await Auth(client, file_id.dc_id, await
74
- client.storage.test_mode()).create(),
75
- await client.storage.test_mode(),
76
- is_media=True,
77
- )
78
- await media_session.start()
79
-
80
- for _ in range(6):
81
- exported_auth = await client.invoke(
82
- raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id))
83
-
84
- try:
85
- await media_session.invoke(
86
- raw.functions.auth.ImportAuthorization(
87
- id=exported_auth.id, bytes=exported_auth.bytes))
88
- break
89
- except AuthBytesInvalid:
90
- logging.debug(
91
- f"Invalid authorization bytes for DC {file_id.dc_id}")
92
- continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93
  else:
94
- await media_session.stop()
95
- raise AuthBytesInvalid
96
- else:
97
- media_session = Session(
98
- client,
99
- file_id.dc_id,
100
- await client.storage.auth_key(),
101
- await client.storage.test_mode(),
102
- is_media=True,
103
- )
104
- await media_session.start()
105
- logging.debug(f"Created media session for DC {file_id.dc_id}")
106
- client.media_sessions[file_id.dc_id] = media_session
107
- else:
108
- logging.debug(f"Using cached media session for DC {file_id.dc_id}")
109
- return media_session
110
-
111
- @staticmethod
112
- async def get_location(
113
- file_id: FileId
114
- ) -> Union[
115
- raw.types.InputPhotoFileLocation,
116
- raw.types.InputDocumentFileLocation,
117
- raw.types.InputPeerPhotoFileLocation,
118
- ]:
119
- """
120
- Returns the file location for the media file.
121
  """
122
- file_type = file_id.file_type
123
-
124
- if file_type == FileType.CHAT_PHOTO:
125
- if file_id.chat_id > 0:
126
- peer = raw.types.InputPeerUser(user_id=file_id.chat_id,
127
- access_hash=file_id.chat_access_hash)
128
- else:
129
- if file_id.chat_access_hash == 0:
130
- peer = raw.types.InputPeerChat(chat_id=-file_id.chat_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
  else:
132
- peer = raw.types.InputPeerChannel(
133
- channel_id=utils.get_channel_id(file_id.chat_id),
134
- access_hash=file_id.chat_access_hash,
135
- )
136
-
137
- location = raw.types.InputPeerPhotoFileLocation(
138
- peer=peer,
139
- volume_id=file_id.volume_id,
140
- local_id=file_id.local_id,
141
- big=file_id.thumbnail_source == ThumbnailSource.CHAT_PHOTO_BIG,
142
- )
143
- elif file_type == FileType.PHOTO:
144
- location = raw.types.InputPhotoFileLocation(
145
- id=file_id.media_id,
146
- access_hash=file_id.access_hash,
147
- file_reference=file_id.file_reference,
148
- thumb_size=file_id.thumbnail_size,
149
- )
150
- else:
151
- location = raw.types.InputDocumentFileLocation(
152
- id=file_id.media_id,
153
- access_hash=file_id.access_hash,
154
- file_reference=file_id.file_reference,
155
- thumb_size=file_id.thumbnail_size,
156
- )
157
- return location
158
-
159
- async def yield_file(
160
- self,
161
- file_id: FileId,
162
- index: int,
163
- offset: int,
164
- first_part_cut: int,
165
- last_part_cut: int,
166
- part_count: int,
167
- chunk_size: int,
168
- ) -> Union[str, None]:
169
-
170
- client = self.client
171
- WORK_LOADS[index] += 1
172
- logging.debug(f"Starting to yielding file with client {index}.")
173
- media_session = await self.generate_media_session(client, file_id)
174
-
175
- current_part = 1
176
-
177
- location = await self.get_location(file_id)
178
-
179
- try:
180
- r = await media_session.invoke(
181
- raw.functions.upload.GetFile(location=location,
182
- offset=offset,
183
- limit=chunk_size), )
184
- if isinstance(r, raw.types.upload.File):
185
- while True:
186
- chunk = r.bytes
187
- if not chunk:
188
- break
189
- elif part_count == 1:
190
- yield chunk[first_part_cut:last_part_cut]
191
- elif current_part == 1:
192
- yield chunk[first_part_cut:]
193
- elif current_part == part_count:
194
- yield chunk[:last_part_cut]
195
- else:
196
- yield chunk
197
-
198
- current_part += 1
199
- offset += chunk_size
200
-
201
- if current_part > part_count:
202
- break
203
-
204
- r = await media_session.invoke(
205
- raw.functions.upload.GetFile(location=location,
206
- offset=offset,
207
- limit=chunk_size), )
208
- except (TimeoutError, AttributeError):
209
- pass
210
- finally:
211
- logging.debug(f"Finished yielding file with {current_part} parts.")
212
- WORK_LOADS[index] -= 1
213
-
214
- async def clean_cache(self) -> None:
215
- """
216
- function to clean the cache to reduce memory usage
217
  """
218
- while True:
219
- await asyncio.sleep(self.clean_timer)
220
- print("** Caches Cleared :", self.cached_file_ids)
221
- self.cached_file_ids.clear()
222
- logging.debug("Cleaned the cache")
 
 
 
 
23
  from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
24
 
25
 
26
+
27
  class ByteStreamer:
28
+ def __init__(self, client: Client):
29
+ self.clean_timer = 30 * 60 # Cache cleanup timer set to 30 minutes
30
+ self.client: Client = client
31
+ self.cached_file_ids: Dict[str, FileId] = {} # Cache to store file properties by db_id
32
+ self.last_activity: float = asyncio.get_event_loop().time() # Track last activity time for the client
33
+ asyncio.create_task(self.clean_cache()) # Start the cache cleanup task
34
+
35
+ def update_last_activity(self):
36
+ """Update the last activity time to the current time."""
37
+ self.last_activity = asyncio.get_event_loop().time()
38
+
39
+ def get_last_activity(self) -> float:
40
+ """Get the last activity time of this client."""
41
+ return self.last_activity
42
+
43
+ async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
44
+ """
45
+ Returns the properties of a media of a specific message in a FileId class.
46
+ If the properties are cached, it'll return the cached results.
47
+ Otherwise, it'll generate the properties from the Message ID and cache them.
48
+ """
49
+ if db_id not in self.cached_file_ids:
50
+ logging.debug("File properties not cached. Generating properties.")
51
+ await self.generate_file_properties(db_id, MULTI_CLIENTS) # Generate and cache the file properties
52
+ logging.debug(f"Cached file properties for file with ID {db_id}")
53
+ return self.cached_file_ids[db_id]
54
 
55
+ async def generate_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
 
 
 
 
 
 
 
 
 
 
56
  """
 
 
 
 
 
 
 
 
 
57
  Generates the properties of a media file on a specific message.
58
+ Returns the properties in a FileId class.
59
+ """
60
+ logging.debug("Generating file properties.")
61
+ file_id = await get_file_ids(self.client, db_id, Message) # Call the method to get the file properties
62
+ logging.debug(f"Generated file ID and Unique ID for file with ID {db_id}")
63
+ self.cached_file_ids[db_id] = file_id # Cache the file properties
64
+ logging.debug(f"Cached media file with ID {db_id}")
65
+ return file_id
66
+
67
+ async def generate_media_session(self, client: Client, file_id: FileId) -> Session:
68
  """
 
 
 
 
 
 
 
 
 
 
69
  Generates the media session for the DC that contains the media file.
70
  This is required for getting the bytes from Telegram servers.
71
  """
72
+ media_session = client.media_sessions.get(file_id.dc_id, None)
73
+
74
+ if media_session is None:
75
+ if file_id.dc_id != await client.storage.dc_id():
76
+ # Create a new media session if one doesn't exist for this DC ID
77
+ media_session = Session(
78
+ client,
79
+ file_id.dc_id,
80
+ await Auth(client, file_id.dc_id, await client.storage.test_mode()).create(),
81
+ await client.storage.test_mode(),
82
+ is_media=True,
83
+ )
84
+ await media_session.start()
85
+
86
+ # Attempt to import authorization from Telegram's servers
87
+ for _ in range(6):
88
+ exported_auth = await client.invoke(
89
+ raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id))
90
+
91
+ try:
92
+ # Import the authorization bytes for the DC
93
+ await media_session.invoke(
94
+ raw.functions.auth.ImportAuthorization(
95
+ id=exported_auth.id, bytes=exported_auth.bytes))
96
+ break
97
+ except AuthBytesInvalid:
98
+ logging.debug(f"Invalid authorization bytes for DC {file_id.dc_id}")
99
+ continue
100
+ else:
101
+ await media_session.stop()
102
+ raise AuthBytesInvalid
103
+ else:
104
+ # Reuse the stored auth key if we're already connected to the correct DC
105
+ media_session = Session(
106
+ client,
107
+ file_id.dc_id,
108
+ await client.storage.auth_key(),
109
+ await client.storage.test_mode(),
110
+ is_media=True,
111
+ )
112
+ await media_session.start()
113
+
114
+ logging.debug(f"Created media session for DC {file_id.dc_id}")
115
+ client.media_sessions[file_id.dc_id] = media_session # Cache the media session
116
  else:
117
+ logging.debug(f"Using cached media session for DC {file_id.dc_id}")
118
+ return media_session
119
+
120
+ @staticmethod
121
+ async def get_location(file_id: FileId) -> Union[
122
+ raw.types.InputPhotoFileLocation,
123
+ raw.types.InputDocumentFileLocation,
124
+ raw.types.InputPeerPhotoFileLocation,
125
+ ]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
126
  """
127
+ Returns the file location for the media file based on its type (Photo or Document).
128
+ """
129
+ file_type = file_id.file_type
130
+
131
+ if file_type == FileType.CHAT_PHOTO:
132
+ # Handle the case for chat photos
133
+ if file_id.chat_id > 0:
134
+ peer = raw.types.InputPeerUser(user_id=file_id.chat_id, access_hash=file_id.chat_access_hash)
135
+ else:
136
+ peer = raw.types.InputPeerChannel(
137
+ channel_id=utils.get_channel_id(file_id.chat_id),
138
+ access_hash=file_id.chat_access_hash,
139
+ )
140
+
141
+ location = raw.types.InputPeerPhotoFileLocation(
142
+ peer=peer,
143
+ volume_id=file_id.volume_id,
144
+ local_id=file_id.local_id,
145
+ big=file_id.thumbnail_source == ThumbnailSource.CHAT_PHOTO_BIG,
146
+ )
147
+ elif file_type == FileType.PHOTO:
148
+ # Handle regular photos
149
+ location = raw.types.InputPhotoFileLocation(
150
+ id=file_id.media_id,
151
+ access_hash=file_id.access_hash,
152
+ file_reference=file_id.file_reference,
153
+ thumb_size=file_id.thumbnail_size,
154
+ )
155
  else:
156
+ # Handle document files
157
+ location = raw.types.InputDocumentFileLocation(
158
+ id=file_id.media_id,
159
+ access_hash=file_id.access_hash,
160
+ file_reference=file_id.file_reference,
161
+ thumb_size=file_id.thumbnail_size,
162
+ )
163
+ return location
164
+
165
+ async def yield_file(
166
+ self,
167
+ file_id: FileId,
168
+ index: int,
169
+ offset: int,
170
+ first_part_cut: int,
171
+ last_part_cut: int,
172
+ part_count: int,
173
+ chunk_size: int,
174
+ ) -> Union[str, None]:
175
+ """
176
+ Yields the file in chunks based on the specified range and chunk size.
177
+ This method streams the file from Telegram's server, breaking it into smaller parts.
178
+ """
179
+ client = self.client
180
+ WORK_LOADS[index] += 1 # Increase the workload for this client
181
+ logging.debug(f"Starting to yield file with client {index}.")
182
+ media_session = await self.generate_media_session(client, file_id)
183
+
184
+ current_part = 1
185
+ location = await self.get_location(file_id)
186
+
187
+ try:
188
+ # Fetch the file chunks
189
+ r = await media_session.invoke(
190
+ raw.functions.upload.GetFile(location=location, offset=offset, limit=chunk_size), )
191
+
192
+ if isinstance(r, raw.types.upload.File):
193
+ # Stream the file in chunks
194
+ while True:
195
+ chunk = r.bytes
196
+ if not chunk:
197
+ break
198
+ elif part_count == 1:
199
+ yield chunk[first_part_cut:last_part_cut]
200
+ elif current_part == 1:
201
+ yield chunk[first_part_cut:]
202
+ elif current_part == part_count:
203
+ yield chunk[:last_part_cut]
204
+ else:
205
+ yield chunk
206
+
207
+ current_part += 1
208
+ offset += chunk_size
209
+
210
+ if current_part > part_count:
211
+ break
212
+
213
+ r = await media_session.invoke(
214
+ raw.functions.upload.GetFile(location=location, offset=offset, limit=chunk_size), )
215
+ except (TimeoutError, AttributeError):
216
+ pass
217
+ except Exception as e:
218
+ logging.info(f"Error at Bytestreamer Generating Chunk : {e}")
219
+ finally:
220
+ logging.debug(f"Finished yielding file with {current_part} parts.")
221
+ WORK_LOADS[index] -= 1 # Decrease the workload for this client
222
+
223
+ async def clean_cache(self) -> None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  """
225
+ Function to clean the cache to reduce memory usage.
226
+ This method will be called periodically to clear the cached file properties.
227
+ """
228
+ await asyncio.sleep(self.clean_timer) # Wait for the cleanup interval
229
+ logging.info("*** Cleaning cached file IDs...")
230
+ self.cached_file_ids.clear() # Clear the cache
231
+ logging.debug("Cache cleaned.")
232
+