privateone commited on
Commit
0d06489
·
1 Parent(s): d0b286c

Code Updates & Optimisations :Server Upgraded-Minor Chnages Due to Download problem

Browse files
FileStream/utils/FileProcessors/custom_dl.py CHANGED
@@ -23,210 +23,200 @@ from FileStream.Exceptions import FIleNotFound, InvalidHash
23
  from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
24
 
25
 
26
-
27
  class ByteStreamer:
28
- def __init__(self, client: Client):
29
- self.clean_timer = 30 * 60 # Cache cleanup timer set to 30 minutes
30
- self.client: Client = client
31
- self.cached_file_ids: Dict[str, FileId] = {} # Cache to store file properties by db_id
32
- self.last_activity: float = asyncio.get_event_loop().time() # Track last activity time for the client
33
- asyncio.create_task(self.clean_cache()) # Start the cache cleanup task
34
-
35
- def update_last_activity(self):
36
- """Update the last activity time to the current time."""
37
- self.last_activity = asyncio.get_event_loop().time()
38
-
39
- def get_last_activity(self) -> float:
40
- """Get the last activity time of this client."""
41
- return self.last_activity
42
-
43
- async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
44
- """
45
- Returns the properties of a media of a specific message in a FileId class.
46
- If the properties are cached, it'll return the cached results.
47
- Otherwise, it'll generate the properties from the Message ID and cache them.
48
- """
49
- if db_id not in self.cached_file_ids:
50
- logging.debug("File properties not cached. Generating properties.")
51
- await self.generate_file_properties(db_id, MULTI_CLIENTS) # Generate and cache the file properties
52
- logging.debug(f"Cached file properties for file with ID {db_id}")
53
- return self.cached_file_ids[db_id]
54
 
55
- async def generate_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
 
 
 
 
 
 
 
 
 
 
56
  """
 
 
 
 
 
 
 
 
 
57
  Generates the properties of a media file on a specific message.
58
- Returns the properties in a FileId class.
59
- """
60
- logging.debug("Generating file properties.")
61
- file_id = await get_file_ids(self.client, db_id, Message) # Call the method to get the file properties
62
- logging.debug(f"Generated file ID and Unique ID for file with ID {db_id}")
63
- self.cached_file_ids[db_id] = file_id # Cache the file properties
64
- logging.debug(f"Cached media file with ID {db_id}")
65
- return file_id
66
-
67
- async def generate_media_session(self, client: Client, file_id: FileId) -> Session:
68
  """
 
 
 
 
 
 
 
 
 
 
69
  Generates the media session for the DC that contains the media file.
70
  This is required for getting the bytes from Telegram servers.
71
  """
72
- media_session = client.media_sessions.get(file_id.dc_id, None)
73
-
74
- if media_session is None:
75
- if file_id.dc_id != await client.storage.dc_id():
76
- # Create a new media session if one doesn't exist for this DC ID
77
- media_session = Session(
78
- client,
79
- file_id.dc_id,
80
- await Auth(client, file_id.dc_id, await client.storage.test_mode()).create(),
81
- await client.storage.test_mode(),
82
- is_media=True,
83
- )
84
- await media_session.start()
85
-
86
- # Attempt to import authorization from Telegram's servers
87
- for _ in range(6):
88
- exported_auth = await client.invoke(
89
- raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id))
90
-
91
- try:
92
- # Import the authorization bytes for the DC
93
- await media_session.invoke(
94
- raw.functions.auth.ImportAuthorization(
95
- id=exported_auth.id, bytes=exported_auth.bytes))
96
- break
97
- except AuthBytesInvalid:
98
- logging.debug(f"Invalid authorization bytes for DC {file_id.dc_id}")
99
- continue
100
- else:
101
- await media_session.stop()
102
- raise AuthBytesInvalid
103
- else:
104
- # Reuse the stored auth key if we're already connected to the correct DC
105
- media_session = Session(
106
- client,
107
- file_id.dc_id,
108
- await client.storage.auth_key(),
109
- await client.storage.test_mode(),
110
- is_media=True,
111
- )
112
- await media_session.start()
113
-
114
- logging.debug(f"Created media session for DC {file_id.dc_id}")
115
- client.media_sessions[file_id.dc_id] = media_session # Cache the media session
116
  else:
117
- logging.debug(f"Using cached media session for DC {file_id.dc_id}")
118
- return media_session
119
-
120
- @staticmethod
121
- async def get_location(file_id: FileId) -> Union[
122
- raw.types.InputPhotoFileLocation,
123
- raw.types.InputDocumentFileLocation,
124
- raw.types.InputPeerPhotoFileLocation,
125
- ]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
126
  """
127
- Returns the file location for the media file based on its type (Photo or Document).
128
- """
129
- file_type = file_id.file_type
130
-
131
- if file_type == FileType.CHAT_PHOTO:
132
- # Handle the case for chat photos
133
- if file_id.chat_id > 0:
134
- peer = raw.types.InputPeerUser(user_id=file_id.chat_id, access_hash=file_id.chat_access_hash)
135
- else:
136
- peer = raw.types.InputPeerChannel(
137
- channel_id=utils.get_channel_id(file_id.chat_id),
138
- access_hash=file_id.chat_access_hash,
139
- )
140
-
141
- location = raw.types.InputPeerPhotoFileLocation(
142
- peer=peer,
143
- volume_id=file_id.volume_id,
144
- local_id=file_id.local_id,
145
- big=file_id.thumbnail_source == ThumbnailSource.CHAT_PHOTO_BIG,
146
- )
147
- elif file_type == FileType.PHOTO:
148
- # Handle regular photos
149
- location = raw.types.InputPhotoFileLocation(
150
- id=file_id.media_id,
151
- access_hash=file_id.access_hash,
152
- file_reference=file_id.file_reference,
153
- thumb_size=file_id.thumbnail_size,
154
- )
155
  else:
156
- # Handle document files
157
- location = raw.types.InputDocumentFileLocation(
158
- id=file_id.media_id,
159
- access_hash=file_id.access_hash,
160
- file_reference=file_id.file_reference,
161
- thumb_size=file_id.thumbnail_size,
162
- )
163
- return location
164
-
165
- async def yield_file(
166
- self,
167
- file_id: FileId,
168
- index: int,
169
- offset: int,
170
- first_part_cut: int,
171
- last_part_cut: int,
172
- part_count: int,
173
- chunk_size: int,
174
- ) -> Union[str, None]:
175
- """
176
- Yields the file in chunks based on the specified range and chunk size.
177
- This method streams the file from Telegram's server, breaking it into smaller parts.
178
- """
179
- client = self.client
180
- WORK_LOADS[index] += 1 # Increase the workload for this client
181
- logging.debug(f"Starting to yield file with client {index}.")
182
- media_session = await self.generate_media_session(client, file_id)
183
-
184
- current_part = 1
185
- location = await self.get_location(file_id)
186
-
187
- try:
188
- # Fetch the file chunks
189
- r = await media_session.invoke(
190
- raw.functions.upload.GetFile(location=location, offset=offset, limit=chunk_size), )
191
-
192
- if isinstance(r, raw.types.upload.File):
193
- # Stream the file in chunks
194
- while True:
195
- chunk = r.bytes
196
- if not chunk:
197
- break
198
- elif part_count == 1:
199
- yield chunk[first_part_cut:last_part_cut]
200
- elif current_part == 1:
201
- yield chunk[first_part_cut:]
202
- elif current_part == part_count:
203
- yield chunk[:last_part_cut]
204
- else:
205
- yield chunk
206
-
207
- current_part += 1
208
- offset += chunk_size
209
-
210
- if current_part > part_count:
211
- break
212
-
213
- r = await media_session.invoke(
214
- raw.functions.upload.GetFile(location=location, offset=offset, limit=chunk_size), )
215
- except (TimeoutError, AttributeError):
216
- pass
217
- except Exception as e:
218
- logging.info(f"Error at Bytestreamer Generating Chunk : {e}")
219
- finally:
220
- logging.debug(f"Finished yielding file with {current_part} parts.")
221
- WORK_LOADS[index] -= 1 # Decrease the workload for this client
222
-
223
- async def clean_cache(self) -> None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  """
225
- Function to clean the cache to reduce memory usage.
226
- This method will be called periodically to clear the cached file properties.
227
- """
228
- await asyncio.sleep(self.clean_timer) # Wait for the cleanup interval
229
- logging.info("*** Cleaning cached file IDs...")
230
- self.cached_file_ids.clear() # Clear the cache
231
- logging.debug("Cache cleaned.")
232
-
 
23
  from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
24
 
25
 
 
26
  class ByteStreamer:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
+ def __init__(self, client: Client):
29
+ self.clean_timer = 30 * 60
30
+ self.client: Client = client
31
+ self.cached_file_ids: Dict[str, FileId] = {}
32
+ asyncio.create_task(self.clean_cache())
33
+
34
+ async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
35
+ """
36
+ Returns the properties of a media of a specific message in a FIleId class.
37
+ if the properties are cached, then it'll return the cached results.
38
+ or it'll generate the properties from the Message ID and cache them.
39
  """
40
+ if not db_id in self.cached_file_ids:
41
+ logging.debug("Before Calling generate_file_properties")
42
+ await self.generate_file_properties(db_id, MULTI_CLIENTS)
43
+ logging.debug(f"Cached file properties for file with ID {db_id}")
44
+ return self.cached_file_ids[db_id]
45
+
46
+ async def generate_file_properties(self, db_id: str,
47
+ MULTI_CLIENTS) -> FileId:
48
+ """
49
  Generates the properties of a media file on a specific message.
50
+ returns ths properties in a FIleId class.
 
 
 
 
 
 
 
 
 
51
  """
52
+ logging.debug("Before calling get_file_ids")
53
+ file_id = await get_file_ids(self.client, db_id, Message)
54
+ logging.debug(f"Generated file ID and Unique ID for file with ID {db_id}")
55
+ self.cached_file_ids[db_id] = file_id
56
+ logging.debug(f"Cached media file with ID {db_id}")
57
+ return self.cached_file_ids[db_id]
58
+
59
+ async def generate_media_session(self, client: Client,
60
+ file_id: FileId) -> Session:
61
+ """
62
  Generates the media session for the DC that contains the media file.
63
  This is required for getting the bytes from Telegram servers.
64
  """
65
+
66
+ media_session = client.media_sessions.get(file_id.dc_id, None)
67
+
68
+ if media_session is None:
69
+ if file_id.dc_id != await client.storage.dc_id():
70
+ media_session = Session(
71
+ client,
72
+ file_id.dc_id,
73
+ await Auth(client, file_id.dc_id, await
74
+ client.storage.test_mode()).create(),
75
+ await client.storage.test_mode(),
76
+ is_media=True,
77
+ )
78
+ await media_session.start()
79
+
80
+ for _ in range(6):
81
+ exported_auth = await client.invoke(
82
+ raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id))
83
+
84
+ try:
85
+ await media_session.invoke(
86
+ raw.functions.auth.ImportAuthorization(
87
+ id=exported_auth.id, bytes=exported_auth.bytes))
88
+ break
89
+ except AuthBytesInvalid:
90
+ logging.debug(
91
+ f"Invalid authorization bytes for DC {file_id.dc_id}")
92
+ continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93
  else:
94
+ await media_session.stop()
95
+ raise AuthBytesInvalid
96
+ else:
97
+ media_session = Session(
98
+ client,
99
+ file_id.dc_id,
100
+ await client.storage.auth_key(),
101
+ await client.storage.test_mode(),
102
+ is_media=True,
103
+ )
104
+ await media_session.start()
105
+ logging.debug(f"Created media session for DC {file_id.dc_id}")
106
+ client.media_sessions[file_id.dc_id] = media_session
107
+ else:
108
+ logging.debug(f"Using cached media session for DC {file_id.dc_id}")
109
+ return media_session
110
+
111
+ @staticmethod
112
+ async def get_location(
113
+ file_id: FileId
114
+ ) -> Union[
115
+ raw.types.InputPhotoFileLocation,
116
+ raw.types.InputDocumentFileLocation,
117
+ raw.types.InputPeerPhotoFileLocation,
118
+ ]:
119
+ """
120
+ Returns the file location for the media file.
121
  """
122
+ file_type = file_id.file_type
123
+
124
+ if file_type == FileType.CHAT_PHOTO:
125
+ if file_id.chat_id > 0:
126
+ peer = raw.types.InputPeerUser(user_id=file_id.chat_id,
127
+ access_hash=file_id.chat_access_hash)
128
+ else:
129
+ if file_id.chat_access_hash == 0:
130
+ peer = raw.types.InputPeerChat(chat_id=-file_id.chat_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
  else:
132
+ peer = raw.types.InputPeerChannel(
133
+ channel_id=utils.get_channel_id(file_id.chat_id),
134
+ access_hash=file_id.chat_access_hash,
135
+ )
136
+
137
+ location = raw.types.InputPeerPhotoFileLocation(
138
+ peer=peer,
139
+ volume_id=file_id.volume_id,
140
+ local_id=file_id.local_id,
141
+ big=file_id.thumbnail_source == ThumbnailSource.CHAT_PHOTO_BIG,
142
+ )
143
+ elif file_type == FileType.PHOTO:
144
+ location = raw.types.InputPhotoFileLocation(
145
+ id=file_id.media_id,
146
+ access_hash=file_id.access_hash,
147
+ file_reference=file_id.file_reference,
148
+ thumb_size=file_id.thumbnail_size,
149
+ )
150
+ else:
151
+ location = raw.types.InputDocumentFileLocation(
152
+ id=file_id.media_id,
153
+ access_hash=file_id.access_hash,
154
+ file_reference=file_id.file_reference,
155
+ thumb_size=file_id.thumbnail_size,
156
+ )
157
+ return location
158
+
159
+ async def yield_file(
160
+ self,
161
+ file_id: FileId,
162
+ index: int,
163
+ offset: int,
164
+ first_part_cut: int,
165
+ last_part_cut: int,
166
+ part_count: int,
167
+ chunk_size: int,
168
+ ) -> Union[str, None]:
169
+
170
+ client = self.client
171
+ WORK_LOADS[index] += 1
172
+ logging.debug(f"Starting to yielding file with client {index}.")
173
+ media_session = await self.generate_media_session(client, file_id)
174
+
175
+ current_part = 1
176
+
177
+ location = await self.get_location(file_id)
178
+
179
+ try:
180
+ r = await media_session.invoke(
181
+ raw.functions.upload.GetFile(location=location,
182
+ offset=offset,
183
+ limit=chunk_size), )
184
+ if isinstance(r, raw.types.upload.File):
185
+ while True:
186
+ chunk = r.bytes
187
+ if not chunk:
188
+ break
189
+ elif part_count == 1:
190
+ yield chunk[first_part_cut:last_part_cut]
191
+ elif current_part == 1:
192
+ yield chunk[first_part_cut:]
193
+ elif current_part == part_count:
194
+ yield chunk[:last_part_cut]
195
+ else:
196
+ yield chunk
197
+
198
+ current_part += 1
199
+ offset += chunk_size
200
+
201
+ if current_part > part_count:
202
+ break
203
+
204
+ r = await media_session.invoke(
205
+ raw.functions.upload.GetFile(location=location,
206
+ offset=offset,
207
+ limit=chunk_size), )
208
+ except (TimeoutError, AttributeError):
209
+ pass
210
+ finally:
211
+ logging.debug(f"Finished yielding file with {current_part} parts.")
212
+ WORK_LOADS[index] -= 1
213
+
214
+ async def clean_cache(self) -> None:
215
+ """
216
+ function to clean the cache to reduce memory usage
217
  """
218
+ while True:
219
+ await asyncio.sleep(self.clean_timer)
220
+ print("** Caches Cleared :", self.cached_file_ids)
221
+ self.cached_file_ids.clear()
222
+ logging.debug("Cleaned the cache")