privateone commited on
Commit
26741bb
·
1 Parent(s): dbda92a

Code Updates & Optimisations

Browse files
FileStream/bot/__init__.py CHANGED
@@ -24,7 +24,7 @@ MULTI_CLIENTS = {}
24
  ACTIVE_CLIENTS = {}
25
 
26
  async def req_client():
27
- index = min(work_loads, key=work_loads.get)
28
  faster_client = MULTI_CLIENTS[index]
29
  response = dict(index=index, client=faster_client)
30
  return response
 
24
  ACTIVE_CLIENTS = {}
25
 
26
  async def req_client():
27
+ index = min(WORK_LOADS, key=WORK_LOADS.get)
28
  faster_client = MULTI_CLIENTS[index]
29
  response = dict(index=index, client=faster_client)
30
  return response
FileStream/server/Functions/downloader.py CHANGED
@@ -21,78 +21,95 @@ from FileStream.server.render_template import render_page, render_upload
21
  from FileStream.utils.FileProcessors.custom_ul import TeleUploader
22
 
23
 
24
- async def media_streamer(request: web.Request, db_id: str):
25
-
26
- range_header = request.headers.get("Range", 0)
27
- #index = minWORK_LOADS, keyWORK_LOADS.get)
28
- #faster_client = MULTI_CLIENTS[index]
29
-
30
- client = await req_client()
31
-
32
- if Telegram.MULTI_CLIENT:
33
- logging.info(f"Client {client['index']} is now serving {request.headers.get('X-FORWARDED-FOR',request.remote)}")
34
-
35
- if client['client'] in ACTIVE_CLIENTS:
36
- tg_connect = ACTIVE_CLIENTS[client['client']]
37
- logging.debug(f"Using cached ByteStreamer object for client {client['index']}")
38
-
39
- else:
40
- logging.debug(f"Creating new ByteStreamer object for client {client['index']}")
41
- tg_connect = utils.ByteStreamer(client['client'])
42
- ACTIVE_CLIENTS[client['client']] = tg_connect
43
-
44
- logging.debug("before calling get_file_properties")
45
- file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS)
46
- logging.debug("after calling get_file_properties")
47
-
48
- file_size = file_id.file_size
49
-
50
- if range_header:
51
- from_bytes, until_bytes = range_header.replace("bytes=", "").split("-")
52
- from_bytes = int(from_bytes)
53
- until_bytes = int(until_bytes) if until_bytes else file_size - 1
54
- else:
55
- from_bytes = request.http_range.start or 0
56
- until_bytes = (request.http_range.stop or file_size) - 1
57
-
58
- if (until_bytes > file_size) or (from_bytes < 0) or (until_bytes < from_bytes):
59
-
60
- return web.Response(
61
- status=416,
62
- body="416: Range not satisfiable",
63
- headers={"Content-Range": f"bytes */{file_size}"},
64
- )
65
-
66
- chunk_size = 512 * 1024
67
- until_bytes = min(until_bytes, file_size - 1)
68
-
69
- offset = from_bytes - (from_bytes % chunk_size)
70
- first_part_cut = from_bytes - offset
71
- last_part_cut = until_bytes % chunk_size + 1
72
-
73
- req_length = until_bytes - from_bytes + 1
74
- part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size)
75
-
76
- body = tg_connect.yield_file(file_id, client['index'], offset, first_part_cut,last_part_cut, part_count, chunk_size)
77
-
78
- mime_type = file_id.mime_type
79
- file_name = utils.get_name(file_id)
80
- disposition = "attachment"
81
-
82
- if not mime_type:
83
- mime_type = mimetypes.guess_type(file_name)[0] or "application/octet-stream"
84
-
85
- # if "video/" in mime_type or "audio/" in mime_type:
86
- # disposition = "inline"
87
-
88
- return web.Response(
89
- status=206 if range_header else 200,
90
- body=body,
91
- headers={
92
- "Content-Type": f"{mime_type}",
93
- "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}",
94
- "Content-Length": str(req_length),
95
- "Content-Disposition": f'{disposition}; filename="{file_name}"',
96
- "Accept-Ranges": "bytes",
97
- },
98
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  from FileStream.utils.FileProcessors.custom_ul import TeleUploader
22
 
23
 
24
+ async def media_streamer(request: web.Request, db_id: str, speed: str):
25
+ # Get the Range header from the request, default to 0 if not present
26
+ range_header = request.headers.get("Range", "bytes=0-")
27
+ client = await req_client()
28
+
29
+ # Log client info if multi-client mode
30
+ if Telegram.MULTI_CLIENT:
31
+ logging.info(f"Client {client['index']} is now serving {request.headers.get('X-FORWARDED-FOR', request.remote)}")
32
+
33
+ # Use an existing ByteStreamer or create a new one
34
+ tg_connect = ACTIVE_CLIENTS.get(client['client'], None)
35
+
36
+ if tg_connect is None:
37
+ logging.debug(f"Creating new ByteStreamer object for client {client['index']}")
38
+ tg_connect = utils.ByteStreamer(client['client'])
39
+ ACTIVE_CLIENTS[client['client']] = tg_connect
40
+ else:
41
+ logging.debug(f"Using cached ByteStreamer object for client {client['index']}")
42
+
43
+ try:
44
+ # Fetch file properties once and use it throughout
45
+ logging.debug("Fetching file properties")
46
+ file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS)
47
+ file_size = file_id.file_size
48
+
49
+ # Parse range header efficiently
50
+ from_bytes, until_bytes = parse_range(range_header, file_size)
51
+
52
+ # If range is invalid, return a 416 error
53
+ if from_bytes is None or until_bytes is None:
54
+ return web.Response(
55
+ status=416,
56
+ body="416: Range not satisfiable",
57
+ headers={"Content-Range": f"bytes */{file_size}"},
58
+ )
59
+
60
+ # Set chunk size based on speed
61
+ chunk_size = 4 * 1024 * 1024 if speed == "FAST" else 512 * 1024
62
+
63
+ # Ensure we don't go past the file size
64
+ until_bytes = min(until_bytes, file_size - 1)
65
+
66
+ # Compute offset and range parts
67
+ offset, first_part_cut, last_part_cut, part_count = compute_offsets(from_bytes, until_bytes, chunk_size)
68
+
69
+ # Request the file chunks
70
+ body = await tg_connect.yield_file(
71
+ file_id, client['index'], offset, first_part_cut, last_part_cut, part_count, chunk_size
72
+ )
73
+
74
+ # Determine MIME type and filename
75
+ mime_type = file_id.mime_type or mimetypes.guess_type(file_id.file_name)[0] or "application/octet-stream"
76
+ file_name = utils.get_name(file_id)
77
+ disposition = "attachment"
78
+
79
+ # Return the response with proper headers and status
80
+ req_length = until_bytes - from_bytes + 1
81
+ return web.Response(
82
+ status=206 if range_header else 200,
83
+ body=body,
84
+ headers={
85
+ "Content-Type": mime_type,
86
+ "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}",
87
+ "Content-Length": str(req_length),
88
+ "Content-Disposition": f'{disposition}; filename="{file_name}"',
89
+ "Accept-Ranges": "bytes",
90
+ },
91
+ )
92
+ except Exception as e:
93
+ logging.error(f"Error in media_streamer: {traceback.format_exc()}")
94
+ raise web.HTTPInternalServerError() # Re-raise the exception as a server error
95
+
96
+
97
+ def parse_range(range_header: str, file_size: int):
98
+ """Helper function to parse the range header."""
99
+ try:
100
+ range_str = range_header.replace("bytes=", "")
101
+ from_bytes, until_bytes = range_str.split("-")
102
+ from_bytes = int(from_bytes)
103
+ until_bytes = int(until_bytes) if until_bytes else file_size - 1
104
+ return from_bytes, until_bytes
105
+ except ValueError:
106
+ return None, None
107
+
108
+
109
+ def compute_offsets(from_bytes: int, until_bytes: int, chunk_size: int):
110
+ """Compute the offsets, cuts, and part counts for file chunking."""
111
+ offset = from_bytes - (from_bytes % chunk_size)
112
+ first_part_cut = from_bytes - offset
113
+ last_part_cut = until_bytes % chunk_size + 1
114
+ part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size)
115
+ return offset, first_part_cut, last_part_cut, part_count
FileStream/server/routes_api.py CHANGED
@@ -1,8 +1,5 @@
1
  import os
2
-
3
  import json
4
- import time
5
- import math
6
  import logging
7
  import asyncio
8
  import traceback
@@ -13,10 +10,11 @@ from bson import ObjectId
13
  from bson.json_util import dumps
14
  from aiohttp.http_exceptions import BadStatusLine
15
 
 
16
  from FileStream.bot import req_client
17
- from FileStream.config import Telegram,Server
18
  from FileStream.Database import Database
19
- from FileStream.TMDB.Endpoint import search_tmdb_any,search_tmdb_tv,search_tmdb_movies
20
  from FileStream.server.exceptions import FIleNotFound, InvalidHash
21
 
22
  from .Functions.downloader import media_streamer
@@ -29,201 +27,140 @@ CORS_HEADERS = {
29
  async def handle_v2(request):
30
  return web.Response(text="Hello from app api!")
31
 
32
- #api.router.add_get('/10/files', list_10_all_files_db)
33
  async def list_10_all_files_db(request):
34
- #file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10]
35
- db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
36
- files, total_files = await db.find_all_files([1, 10])
37
- print(files)
38
- return web.json_response([ dict(file) async for file in files], headers=CORS_HEADERS)
39
-
40
- #return file_list, total_files
41
 
42
-
43
- #api.router.add_get('/files', list_all_files_db)
44
  async def list_all_files_db(request):
45
- db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
46
- files= await db.get_all_files_api()
47
- #print(files, type(files))
48
- return web.json_response(json.loads(dumps(files)), headers=CORS_HEADERS)
49
 
50
- #api.router.add_get('/tmdb/mix', list_all_files_tmdb)
51
  async def list_all_tmdb_movies_from_db(request):
52
- db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
53
- files= await db.get_all_files()
54
- #print(files)
55
- response=[]
56
- async for row in files:
57
- #print(row['file']['caption'])
58
- try :
59
- #print("* Response",search_tmdb(row['file']['caption'] if row['file']['caption'] else row['file']['file_name']))
60
- resp = search_tmdb_movies( str(row['file']['caption']) if str(row['file']['caption']) else str(row['file']['file_name']))
61
- if resp != None :
62
- #resp= dict(resp)
63
- #print("TMDB Response :",resp)
64
- response.append(resp)
65
- else:
66
- print("\n * Skipped:",row['file']['caption'],str(row['file']['file_name']))
67
- continue
68
- except Exception as e:
69
- print("Error ",e)
70
- break
71
- return web.json_response(json.loads(dumps(response)),headers=CORS_HEADERS)
72
-
73
-
74
- #api.router.add_get('/tmdb/tv', list_all_tmdb_tv_from_db)
75
  async def list_all_tmdb_tv_from_db(request):
76
- db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
77
- files= await db.get_all_files()
78
- #print(files)
79
- response=[]
80
- async for row in files:
81
- #print(row['file']['caption'])
82
- try :
83
- #print("* Response",search_tmdb(row['file']['caption'] if row['file']['caption'] else row['file']['file_name']))
84
- resp = search_tmdb_tv( str(row['file']['caption']) if str(row['file']['caption']) else str(row['file']['file_name']))
85
- if resp != None :
86
- #resp= dict(resp)
87
- #print("TMDB Response :",resp)
88
- response.append(resp)
89
- else:
90
- print("\n * Skipped:",row['file']['caption'],str(row['file']['file_name']))
91
- continue
92
- except Exception as e:
93
- print("Error ",e)
94
- break
95
- return web.json_response(json.loads(dumps(response)), headers=CORS_HEADERS)
96
-
97
- #api.router.add_get('/tmdb/list', list_all_files_tmdb)
98
  async def list_all_files_tmdb(request):
99
- db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
100
- files= await db.get_all_files()
101
- async def data_generator():
102
- async for row in files:
103
- #print(row['file']['caption'])
104
- try :
105
- #print("* Response",search_tmdb(row['file']['caption'] if row['file']['caption'] else row['file']['file_name']))
106
- resp = search_tmdb_any( str(row['file']['caption']) if str(row['file']['caption']) else str(row['file']['file_name']))
107
- if resp != None :
108
- #resp= dict(resp)
109
- #print("TMDB Response :",resp)
110
- #response.append(resp)
111
- yield json.dumps(resp) + + '\n'
112
-
113
- else:
114
- print("\n * Skipped:",row['file']['caption'],str(row['file']['file_name']))
115
- continue
116
-
117
- except Exception as e:
118
- print("Error ",e)
119
- break
120
-
121
- return web.Response(body=data_generator(), content_type='application/json', headers=CORS_HEADERS)
122
-
123
- #api.router.add_get('/tmdb/files', list_all_files)
124
  async def list_all_files(request):
125
- db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
126
- files=await db.get_all_files_api()
127
- resp=[{
128
- "adult": False,
129
- "backdrop_path": "/c1bz69r0v065TGFA5nqBiKzPDys.jpg",
130
- "genre_ids": [
131
- 35,
132
- 10751,
133
- 10402
134
- ],
135
- "id": f"{row['_id']}",
136
- "original_language": "en-hi",
137
- "original_title": f"{str(row['file']['caption'])}",
138
- "overview": "XRepo Movies",
139
- "popularity": 1710.176,
140
- "poster_path": "/irIS5Tn3TXjNi1R9BpWvGAN4CZ1.jpg",
141
- "release_date": "2022-10-07",
142
- "title": f"{str(row['file']['caption'])}",
143
- "link": f"{Server.URL}api/dl/{row['_id']}",
144
- "vote_average": 7.8,
145
- "vote_count": 122,
146
- }
147
- for row in files]
148
- return web.json_response(json.loads(dumps(resp)), headers=CORS_HEADERS)
149
-
150
- #----------------------------------------Upload----------------------------------------------#
151
- #@routes.post("/upload")
152
  async def upload_file(request: web.Request):
153
-
154
- data = await request.post()
155
- file = data.get('file').file
156
- chunk = file.read()
157
- """
158
- user_id :
159
- file_id :"BAACAgUAAxkBAAJBHGYI_aJSvyL_ijKwrVHQVzFgC1YZAAItDQACOnNAVMIpWwl6b63EHg…"
160
- file_unique_id :"AgADLQ0AAjpzQFQ"
161
- file_name :"Dadur_Kirti_S01_COMBINED_720p_HOICHOI_WEB_DL_Bengali@COOL_MOVIES.mp4"
162
- file_size : 1354816011
163
- mime_type : "video/mp4"
164
- time : 1711865251.1016757
165
- user_type: "TELEGRAM"/"WEB"
166
- privacy_type: "PUBLIC"/"PRIVATE"
167
- file_ids :
168
- """
169
- file_details = dict(user_id="thebinary1",
170
- dropzone_id=str(data["dzuuid"]),
171
- file=dict(file_id=str(data["dzuuid"]),
172
- file_unique_id=str(data["dzuuid"]),
173
- file_name=str(data.get('file').filename),
174
- file_size=int(data["dztotalfilesize"]),
175
- mime_type=mime_identifier(str(data.get('file').filename)),
176
- part_size=int(data["dzchunksize"]),
177
- file_part=int(data["dzchunkindex"]),
178
- total_parts=int(data["dztotalchunkcount"])),
179
- time=Time_ISTKolNow(),
180
- user_type="WEB",
181
- privacy_type="PRIVATE")
182
-
183
- print(file_details) if (file_details["file"]["file_part"] == 0) else None
184
- client_req = await req_client()
185
- #faster_client = client_req["client"]
186
- #index = client_req["index"]
187
- print("using :", client_req["index"])
188
- tg_connect = TeleUploader(client_req["client"])
189
- main = await tg_connect.upload_web_file(file_details, chunk)
190
- #print("Response:", main)
191
- return web.json_response({
192
- "status": main.get("status"),
193
- "message": main.get("message")
194
- })
195
-
196
-
197
- #-------------Routes to Downloada File Witha Path-----------------#
198
-
199
- #@routes.get("/dl/{path}", allow_head=True)
200
  async def stream_handler(request: web.Request):
201
- try:
202
- path = request.match_info["path"]
203
- return await media_streamer(request, path)
204
- except InvalidHash as e:
205
- raise web.HTTPForbidden(text=e.message)
206
- except FIleNotFound as e:
207
- raise web.HTTPNotFound(text=e.message)
208
- except (AttributeError, BadStatusLine, ConnectionResetError):
209
- pass
210
- except Exception as e:
211
- traceback.print_exc()
212
- logging.critical(e.with_traceback(None))
213
- logging.debug(traceback.format_exc())
214
- raise web.HTTPInternalServerError(text=str(e))
215
-
216
 
 
217
  api = web.Application()
218
-
219
- cors = aiohttp_cors.setup(api, defaults={
220
- "*": aiohttp_cors.ResourceOptions(
221
- allow_credentials=False,
222
- expose_headers="*",
223
- allow_headers="*",
224
- allow_methods="*" #["GET"] Allowing specific methods
225
- )
226
- })
227
 
228
  cors.add(api.router.add_get('/', handle_v2))
229
  api.router.add_get('/files', list_all_files_db)
@@ -234,4 +171,4 @@ api.router.add_get('/tmdb/tv', list_all_tmdb_tv_from_db)
234
  api.router.add_get('/tmdb/movies', list_all_tmdb_movies_from_db)
235
 
236
  api.router.add_get('/upload', upload_file)
237
- api.router.add_get('/dl/{path}', stream_handler)
 
1
  import os
 
2
  import json
 
 
3
  import logging
4
  import asyncio
5
  import traceback
 
10
  from bson.json_util import dumps
11
  from aiohttp.http_exceptions import BadStatusLine
12
 
13
+ #---------------------Local Imports----------------------------------#
14
  from FileStream.bot import req_client
15
+ from FileStream.config import Telegram, Server
16
  from FileStream.Database import Database
17
+ from FileStream.TMDB.Endpoint import search_tmdb_any, search_tmdb_tv, search_tmdb_movies
18
  from FileStream.server.exceptions import FIleNotFound, InvalidHash
19
 
20
  from .Functions.downloader import media_streamer
 
27
  async def handle_v2(request):
28
  return web.Response(text="Hello from app api!")
29
 
30
+ # API endpoint to list 10 files (pagination optimization)
31
  async def list_10_all_files_db(request):
32
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
33
+ files, total_files = await db.find_all_files([1, 10])
34
+ return web.json_response([dict(file) async for file in files], headers=CORS_HEADERS)
 
 
 
 
35
 
36
+ # API endpoint to list all files in database
 
37
  async def list_all_files_db(request):
38
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
39
+ files = await db.get_all_files_api() # Ensure `get_all_files_api` is optimized for performance
40
+ return web.json_response(json.loads(dumps(files)), headers=CORS_HEADERS)
 
41
 
42
+ # API endpoint to list all TMDB movies related to files in the database
43
  async def list_all_tmdb_movies_from_db(request):
44
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
45
+ files = await db.get_all_files()
46
+ response = []
47
+ for row in files:
48
+ try:
49
+ resp = search_tmdb_movies(str(row['file']['caption']) if row['file']['caption'] else str(row['file']['file_name']))
50
+ if resp:
51
+ response.append(resp)
52
+ except Exception as e:
53
+ logging.error(f"Error while fetching TMDB movie for {row['file']['caption']}: {e}")
54
+ return web.json_response(json.loads(dumps(response)), headers=CORS_HEADERS)
55
+
56
+ # API endpoint to list all TMDB TV shows related to files in the database
 
 
 
 
 
 
 
 
 
 
57
  async def list_all_tmdb_tv_from_db(request):
58
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
59
+ files = await db.get_all_files()
60
+ response = []
61
+ for row in files:
62
+ try:
63
+ resp = search_tmdb_tv(str(row['file']['caption']) if row['file']['caption'] else str(row['file']['file_name']))
64
+ if resp:
65
+ response.append(resp)
66
+ except Exception as e:
67
+ logging.error(f"Error while fetching TMDB TV show for {row['file']['caption']}: {e}")
68
+ return web.json_response(json.loads(dumps(response)), headers=CORS_HEADERS)
69
+
70
+ # API endpoint to list all TMDB results, streaming them to the client
 
 
 
 
 
 
 
 
 
71
  async def list_all_files_tmdb(request):
72
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
73
+ files = await db.get_all_files()
74
+
75
+ async def data_generator():
76
+ for row in files:
77
+ try:
78
+ resp = search_tmdb_any(str(row['file']['caption']) if row['file']['caption'] else str(row['file']['file_name']))
79
+ if resp:
80
+ yield json.dumps(resp) + '\n' # Streaming data to client in chunks
81
+ except Exception as e:
82
+ logging.error(f"Error while fetching TMDB data for {row['file']['caption']}: {e}")
83
+
84
+ return web.Response(body=data_generator(), content_type='application/json', headers=CORS_HEADERS)
85
+
86
+ # API endpoint to list all files for a simple response
 
 
 
 
 
 
 
 
 
 
87
  async def list_all_files(request):
88
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
89
+ files = await db.get_all_files_api()
90
+ resp = [{
91
+ "adult": False,
92
+ "backdrop_path": "/c1bz69r0v065TGFA5nqBiKzPDys.jpg",
93
+ "genre_ids": [35, 10751, 10402],
94
+ "id": f"{row['_id']}",
95
+ "original_language": "en-hi",
96
+ "original_title": f"{str(row['file']['caption'])}",
97
+ "overview": "XRepo Movies",
98
+ "popularity": 1710.176,
99
+ "poster_path": "/irIS5Tn3TXjNi1R9BpWvGAN4CZ1.jpg",
100
+ "release_date": "2022-10-07",
101
+ "title": f"{str(row['file']['caption'])}",
102
+ "link": f"{Server.URL}api/dl/{row['_id']}",
103
+ "vote_average": 7.8,
104
+ "vote_count": 122,
105
+ } for row in files]
106
+ return web.json_response(json.loads(dumps(resp)), headers=CORS_HEADERS)
107
+
108
+ # Upload endpoint with optimization (not reading entire file into memory)
 
 
 
 
 
 
109
  async def upload_file(request: web.Request):
110
+ data = await request.post()
111
+ file = data.get('file').file
112
+ chunk = file.read() # Read the file in chunks to avoid memory overload
113
+ file_details = dict(
114
+ user_id="thebinary1",
115
+ dropzone_id=str(data["dzuuid"]),
116
+ file=dict(
117
+ file_id=str(data["dzuuid"]),
118
+ file_unique_id=str(data["dzuuid"]),
119
+ file_name=str(data.get('file').filename),
120
+ file_size=int(data["dztotalfilesize"]),
121
+ mime_type=mime_identifier(str(data.get('file').filename)),
122
+ part_size=int(data["dzchunksize"]),
123
+ file_part=int(data["dzchunkindex"]),
124
+ total_parts=int(data["dztotalchunkcount"])
125
+ ),
126
+ time=Time_ISTKolNow(),
127
+ user_type="WEB",
128
+ privacy_type="PRIVATE"
129
+ )
130
+
131
+ client_req = await req_client() # Ensure client request is fast and optimized
132
+ tg_connect = TeleUploader(client_req["client"])
133
+ main = await tg_connect.upload_web_file(file_details, chunk)
134
+
135
+ return web.json_response({
136
+ "status": main.get("status"),
137
+ "message": main.get("message")
138
+ })
139
+
140
+ # Stream file handler with optimized error handling
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
  async def stream_handler(request: web.Request):
142
+ try:
143
+ path = request.match_info["path"]
144
+ return await media_streamer(request, path, "FAST")
145
+ except InvalidHash as e:
146
+ raise web.HTTPForbidden(text=e.message)
147
+ except FIleNotFound as e:
148
+ raise web.HTTPNotFound(text=e.message)
149
+ except (AttributeError, BadStatusLine, ConnectionResetError):
150
+ pass # Handle expected errors silently
151
+ except Exception as e:
152
+ logging.error(f"Error while streaming file: {str(e)}")
153
+ traceback.print_exc()
154
+ raise web.HTTPInternalServerError(text=str(e))
 
 
155
 
156
+ # Web server setup with optimized CORS handling
157
  api = web.Application()
158
+ cors = aiohttp_cors.setup(api, defaults={"*": aiohttp_cors.ResourceOptions(
159
+ allow_credentials=False,
160
+ expose_headers="*",
161
+ allow_headers="*",
162
+ allow_methods="*"
163
+ )})
 
 
 
164
 
165
  cors.add(api.router.add_get('/', handle_v2))
166
  api.router.add_get('/files', list_all_files_db)
 
171
  api.router.add_get('/tmdb/movies', list_all_tmdb_movies_from_db)
172
 
173
  api.router.add_get('/upload', upload_file)
174
+ api.router.add_get('/dl/{path}', stream_handler)
FileStream/utils/FileProcessors/custom_dl.py CHANGED
@@ -1,208 +1,211 @@
1
  import asyncio
2
  import logging
3
  from typing import Dict, Union
4
- from FileStream.bot import WORK_LOADS
5
  from pyrogram import Client, utils, raw
6
- from .file_properties import get_file_ids
7
  from pyrogram.session import Session, Auth
8
  from pyrogram.errors import AuthBytesInvalid
9
  from pyrogram.file_id import FileId, FileType, ThumbnailSource
10
- from pyrogram.types import Message
 
 
 
 
11
 
12
 
13
  class ByteStreamer:
14
 
15
- def __init__(self, client: Client):
16
- self.clean_timer = 30 * 60
17
- self.client: Client = client
18
- self.cached_file_ids: Dict[str, FileId] = {}
19
- asyncio.create_task(self.clean_cache())
20
-
21
- async def get_file_properties(self, db_id: str, multi_clients) -> FileId:
22
- """
23
- Returns the properties of a media of a specific message in a FIleId class.
24
- if the properties are cached, then it'll return the cached results.
25
- or it'll generate the properties from the Message ID and cache them.
 
 
 
 
 
 
 
 
26
  """
27
- if not db_id in self.cached_file_ids:
28
- logging.debug("Before Calling generate_file_properties")
29
- await self.generate_file_properties(db_id, multi_clients)
30
- logging.debug(f"Cached file properties for file with ID {db_id}")
31
- return self.cached_file_ids[db_id]
32
-
33
- async def generate_file_properties(self, db_id: str,
34
- multi_clients) -> FileId:
35
- """
36
  Generates the properties of a media file on a specific message.
37
- returns ths properties in a FIleId class.
 
 
 
 
 
 
 
 
 
38
  """
39
- logging.debug("Before calling get_file_ids")
40
- file_id = await get_file_ids(self.client, db_id, Message)
41
- logging.debug(f"Generated file ID and Unique ID for file with ID {db_id}")
42
- self.cached_file_ids[db_id] = file_id
43
- logging.debug(f"Cached media file with ID {db_id}")
44
- return self.cached_file_ids[db_id]
45
-
46
- async def generate_media_session(self, client: Client,
47
- file_id: FileId) -> Session:
48
- """
49
  Generates the media session for the DC that contains the media file.
50
  This is required for getting the bytes from Telegram servers.
51
  """
52
-
53
- media_session = client.media_sessions.get(file_id.dc_id, None)
54
-
55
- if media_session is None:
56
- if file_id.dc_id != await client.storage.dc_id():
57
- media_session = Session(
58
- client,
59
- file_id.dc_id,
60
- await Auth(client, file_id.dc_id, await
61
- client.storage.test_mode()).create(),
62
- await client.storage.test_mode(),
63
- is_media=True,
64
- )
65
- await media_session.start()
66
-
67
- for _ in range(6):
68
- exported_auth = await client.invoke(
69
- raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id))
70
-
71
- try:
72
- await media_session.invoke(
73
- raw.functions.auth.ImportAuthorization(
74
- id=exported_auth.id, bytes=exported_auth.bytes))
75
- break
76
- except AuthBytesInvalid:
77
- logging.debug(
78
- f"Invalid authorization bytes for DC {file_id.dc_id}")
79
- continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80
  else:
81
- await media_session.stop()
82
- raise AuthBytesInvalid
83
- else:
84
- media_session = Session(
85
- client,
86
- file_id.dc_id,
87
- await client.storage.auth_key(),
88
- await client.storage.test_mode(),
89
- is_media=True,
90
- )
91
- await media_session.start()
92
- logging.debug(f"Created media session for DC {file_id.dc_id}")
93
- client.media_sessions[file_id.dc_id] = media_session
94
- else:
95
- logging.debug(f"Using cached media session for DC {file_id.dc_id}")
96
- return media_session
97
-
98
- @staticmethod
99
- async def get_location(
100
- file_id: FileId
101
- ) -> Union[
102
- raw.types.InputPhotoFileLocation,
103
- raw.types.InputDocumentFileLocation,
104
- raw.types.InputPeerPhotoFileLocation,
105
- ]:
106
- """
107
- Returns the file location for the media file.
108
  """
109
- file_type = file_id.file_type
110
-
111
- if file_type == FileType.CHAT_PHOTO:
112
- if file_id.chat_id > 0:
113
- peer = raw.types.InputPeerUser(user_id=file_id.chat_id,
114
- access_hash=file_id.chat_access_hash)
115
- else:
116
- if file_id.chat_access_hash == 0:
117
- peer = raw.types.InputPeerChat(chat_id=-file_id.chat_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
  else:
119
- peer = raw.types.InputPeerChannel(
120
- channel_id=utils.get_channel_id(file_id.chat_id),
121
- access_hash=file_id.chat_access_hash,
122
- )
123
-
124
- location = raw.types.InputPeerPhotoFileLocation(
125
- peer=peer,
126
- volume_id=file_id.volume_id,
127
- local_id=file_id.local_id,
128
- big=file_id.thumbnail_source == ThumbnailSource.CHAT_PHOTO_BIG,
129
- )
130
- elif file_type == FileType.PHOTO:
131
- location = raw.types.InputPhotoFileLocation(
132
- id=file_id.media_id,
133
- access_hash=file_id.access_hash,
134
- file_reference=file_id.file_reference,
135
- thumb_size=file_id.thumbnail_size,
136
- )
137
- else:
138
- location = raw.types.InputDocumentFileLocation(
139
- id=file_id.media_id,
140
- access_hash=file_id.access_hash,
141
- file_reference=file_id.file_reference,
142
- thumb_size=file_id.thumbnail_size,
143
- )
144
- return location
145
-
146
- async def yield_file(
147
- self,
148
- file_id: FileId,
149
- index: int,
150
- offset: int,
151
- first_part_cut: int,
152
- last_part_cut: int,
153
- part_count: int,
154
- chunk_size: int,
155
- ) -> Union[str, None]:
156
-
157
- client = self.client
158
- WORK_LOADS[index] += 1
159
- logging.debug(f"Starting to yielding file with client {index}.")
160
- media_session = await self.generate_media_session(client, file_id)
161
-
162
- current_part = 1
163
-
164
- location = await self.get_location(file_id)
165
-
166
- try:
167
- r = await media_session.invoke(
168
- raw.functions.upload.GetFile(location=location,
169
- offset=offset,
170
- limit=chunk_size), )
171
- if isinstance(r, raw.types.upload.File):
172
- while True:
173
- chunk = r.bytes
174
- if not chunk:
175
- break
176
- elif part_count == 1:
177
- yield chunk[first_part_cut:last_part_cut]
178
- elif current_part == 1:
179
- yield chunk[first_part_cut:]
180
- elif current_part == part_count:
181
- yield chunk[:last_part_cut]
182
- else:
183
- yield chunk
184
-
185
- current_part += 1
186
- offset += chunk_size
187
-
188
- if current_part > part_count:
189
- break
190
-
191
- r = await media_session.invoke(
192
- raw.functions.upload.GetFile(location=location,
193
- offset=offset,
194
- limit=chunk_size), )
195
- except (TimeoutError, AttributeError):
196
- pass
197
- finally:
198
- logging.debug(f"Finished yielding file with {current_part} parts.")
199
- WORK_LOADS[index] -= 1
200
-
201
- async def clean_cache(self) -> None:
202
- """
203
- function to clean the cache to reduce memory usage
204
  """
205
- while True:
206
- await asyncio.sleep(self.clean_timer)
207
- self.cached_file_ids.clear()
208
- logging.debug("Cleaned the cache")
 
 
 
 
 
1
  import asyncio
2
  import logging
3
  from typing import Dict, Union
4
+ from pyrogram.types import Message
5
  from pyrogram import Client, utils, raw
 
6
  from pyrogram.session import Session, Auth
7
  from pyrogram.errors import AuthBytesInvalid
8
  from pyrogram.file_id import FileId, FileType, ThumbnailSource
9
+
10
+ #--------------------Local Imports -------------------------------#
11
+
12
+ from .file_properties import get_file_ids
13
+ from FileStream.bot import WORK_LOADS
14
 
15
 
16
  class ByteStreamer:
17
 
18
+ def __init__(self, client: Client):
19
+ self.clean_timer = 30 * 60 # Cache cleanup timer set to 30 minutes
20
+ self.client: Client = client
21
+ self.cached_file_ids: Dict[str, FileId] = {} # Cache to store file properties by db_id
22
+ asyncio.create_task(self.clean_cache()) # Start the cache cleanup task
23
+
24
+ async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
25
+ """
26
+ Returns the properties of a media of a specific message in a FileId class.
27
+ If the properties are cached, it'll return the cached results.
28
+ Otherwise, it'll generate the properties from the Message ID and cache them.
29
+ """
30
+ if db_id not in self.cached_file_ids:
31
+ logging.debug("File properties not cached. Generating properties.")
32
+ await self.generate_file_properties(db_id, MULTI_CLIENTS) # Generate and cache the file properties
33
+ logging.debug(f"Cached file properties for file with ID {db_id}")
34
+ return self.cached_file_ids[db_id]
35
+
36
+ async def generate_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
37
  """
 
 
 
 
 
 
 
 
 
38
  Generates the properties of a media file on a specific message.
39
+ Returns the properties in a FileId class.
40
+ """
41
+ logging.debug("Generating file properties.")
42
+ file_id = await get_file_ids(self.client, db_id, Message) # Call the method to get the file properties
43
+ logging.debug(f"Generated file ID and Unique ID for file with ID {db_id}")
44
+ self.cached_file_ids[db_id] = file_id # Cache the file properties
45
+ logging.debug(f"Cached media file with ID {db_id}")
46
+ return file_id
47
+
48
+ async def generate_media_session(self, client: Client, file_id: FileId) -> Session:
49
  """
 
 
 
 
 
 
 
 
 
 
50
  Generates the media session for the DC that contains the media file.
51
  This is required for getting the bytes from Telegram servers.
52
  """
53
+ media_session = client.media_sessions.get(file_id.dc_id, None)
54
+
55
+ if media_session is None:
56
+ if file_id.dc_id != await client.storage.dc_id():
57
+ # Create a new media session if one doesn't exist for this DC ID
58
+ media_session = Session(
59
+ client,
60
+ file_id.dc_id,
61
+ await Auth(client, file_id.dc_id, await client.storage.test_mode()).create(),
62
+ await client.storage.test_mode(),
63
+ is_media=True,
64
+ )
65
+ await media_session.start()
66
+
67
+ # Attempt to import authorization from Telegram's servers
68
+ for _ in range(6):
69
+ exported_auth = await client.invoke(
70
+ raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id))
71
+
72
+ try:
73
+ # Import the authorization bytes for the DC
74
+ await media_session.invoke(
75
+ raw.functions.auth.ImportAuthorization(
76
+ id=exported_auth.id, bytes=exported_auth.bytes))
77
+ break
78
+ except AuthBytesInvalid:
79
+ logging.debug(f"Invalid authorization bytes for DC {file_id.dc_id}")
80
+ continue
81
+ else:
82
+ await media_session.stop()
83
+ raise AuthBytesInvalid
84
+ else:
85
+ # Reuse the stored auth key if we're already connected to the correct DC
86
+ media_session = Session(
87
+ client,
88
+ file_id.dc_id,
89
+ await client.storage.auth_key(),
90
+ await client.storage.test_mode(),
91
+ is_media=True,
92
+ )
93
+ await media_session.start()
94
+
95
+ logging.debug(f"Created media session for DC {file_id.dc_id}")
96
+ client.media_sessions[file_id.dc_id] = media_session # Cache the media session
97
  else:
98
+ logging.debug(f"Using cached media session for DC {file_id.dc_id}")
99
+ return media_session
100
+
101
+ @staticmethod
102
+ async def get_location(file_id: FileId) -> Union[
103
+ raw.types.InputPhotoFileLocation,
104
+ raw.types.InputDocumentFileLocation,
105
+ raw.types.InputPeerPhotoFileLocation,
106
+ ]:
107
+ """
108
+ Returns the file location for the media file based on its type (Photo or Document).
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
  """
110
+ file_type = file_id.file_type
111
+
112
+ if file_type == FileType.CHAT_PHOTO:
113
+ # Handle the case for chat photos
114
+ if file_id.chat_id > 0:
115
+ peer = raw.types.InputPeerUser(user_id=file_id.chat_id, access_hash=file_id.chat_access_hash)
116
+ else:
117
+ peer = raw.types.InputPeerChannel(
118
+ channel_id=utils.get_channel_id(file_id.chat_id),
119
+ access_hash=file_id.chat_access_hash,
120
+ )
121
+
122
+ location = raw.types.InputPeerPhotoFileLocation(
123
+ peer=peer,
124
+ volume_id=file_id.volume_id,
125
+ local_id=file_id.local_id,
126
+ big=file_id.thumbnail_source == ThumbnailSource.CHAT_PHOTO_BIG,
127
+ )
128
+ elif file_type == FileType.PHOTO:
129
+ # Handle regular photos
130
+ location = raw.types.InputPhotoFileLocation(
131
+ id=file_id.media_id,
132
+ access_hash=file_id.access_hash,
133
+ file_reference=file_id.file_reference,
134
+ thumb_size=file_id.thumbnail_size,
135
+ )
136
  else:
137
+ # Handle document files
138
+ location = raw.types.InputDocumentFileLocation(
139
+ id=file_id.media_id,
140
+ access_hash=file_id.access_hash,
141
+ file_reference=file_id.file_reference,
142
+ thumb_size=file_id.thumbnail_size,
143
+ )
144
+ return location
145
+
146
+ async def yield_file(
147
+ self,
148
+ file_id: FileId,
149
+ index: int,
150
+ offset: int,
151
+ first_part_cut: int,
152
+ last_part_cut: int,
153
+ part_count: int,
154
+ chunk_size: int,
155
+ ) -> Union[str, None]:
156
+ """
157
+ Yields the file in chunks based on the specified range and chunk size.
158
+ This method streams the file from Telegram's server, breaking it into smaller parts.
159
+ """
160
+ client = self.client
161
+ WORK_LOADS[index] += 1 # Increase the workload for this client
162
+ logging.debug(f"Starting to yield file with client {index}.")
163
+ media_session = await self.generate_media_session(client, file_id)
164
+
165
+ current_part = 1
166
+ location = await self.get_location(file_id)
167
+
168
+ try:
169
+ # Fetch the file chunks
170
+ r = await media_session.invoke(
171
+ raw.functions.upload.GetFile(location=location, offset=offset, limit=chunk_size), )
172
+
173
+ if isinstance(r, raw.types.upload.File):
174
+ # Stream the file in chunks
175
+ while True:
176
+ chunk = r.bytes
177
+ if not chunk:
178
+ break
179
+ elif part_count == 1:
180
+ yield chunk[first_part_cut:last_part_cut]
181
+ elif current_part == 1:
182
+ yield chunk[first_part_cut:]
183
+ elif current_part == part_count:
184
+ yield chunk[:last_part_cut]
185
+ else:
186
+ yield chunk
187
+
188
+ current_part += 1
189
+ offset += chunk_size
190
+
191
+ if current_part > part_count:
192
+ break
193
+
194
+ r = await media_session.invoke(
195
+ raw.functions.upload.GetFile(location=location, offset=offset, limit=chunk_size), )
196
+ except (TimeoutError, AttributeError):
197
+ pass
198
+ finally:
199
+ logging.debug(f"Finished yielding file with {current_part} parts.")
200
+ WORK_LOADS[index] -= 1 # Decrease the workload for this client
201
+
202
+ async def clean_cache(self) -> None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
  """
204
+ Function to clean the cache to reduce memory usage.
205
+ This method will be called periodically to clear the cached file properties.
206
+ """
207
+ while True:
208
+ await asyncio.sleep(self.clean_timer) # Wait for the cleanup interval
209
+ logging.debug("Cleaning cached file properties...")
210
+ self.cached_file_ids.clear() # Clear the cache
211
+ logging.debug("Cache cleaned.")
FileStream/utils/FileProcessors/custom_mix.py CHANGED
@@ -71,6 +71,7 @@ class TGFileController:
71
  """
72
  while True:
73
  await asyncio.sleep(self.clean_timer)
 
74
  self.cached_file_ids.clear()
75
  print("Cleaned the cache")
76
  logging.debug("Cleaned the cache")
 
71
  """
72
  while True:
73
  await asyncio.sleep(self.clean_timer)
74
+ print("** Caches Cleared :", self.cached_file_ids)
75
  self.cached_file_ids.clear()
76
  print("Cleaned the cache")
77
  logging.debug("Cleaned the cache")
FileStream/utils/FileProcessors/custom_ul.py CHANGED
@@ -37,10 +37,11 @@ class TeleUploader:
37
  #print("\n* Client :", client, "\n")
38
  async def clean_cache(self) -> None:
39
  """
40
- function to clean the cache to reduce memory usage
41
- """
42
  while True:
43
  await asyncio.sleep(self.clean_timer)
 
44
  self.cached_file_ids.clear()
45
  print("Cleaned the cache")
46
  logging.debug("Cleaned the cache")
 
37
  #print("\n* Client :", client, "\n")
38
  async def clean_cache(self) -> None:
39
  """
40
+ function to clean the cache to reduce memory usage
41
+ """
42
  while True:
43
  await asyncio.sleep(self.clean_timer)
44
+ print("** Caches Cleared :", self.cached_file_ids)
45
  self.cached_file_ids.clear()
46
  print("Cleaned the cache")
47
  logging.debug("Cleaned the cache")
Unused Codes/__main__old.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  import sys
2
  import asyncio
3
  import logging
 
1
+ #Filestream/__main__.py
2
+
3
  import sys
4
  import asyncio
5
  import logging
Unused Codes/custom_dl_old.py ADDED
@@ -0,0 +1,211 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #FileStream/utils/FileProcessors/custom_dl.py
2
+
3
+ import asyncio
4
+ import logging
5
+ from typing import Dict, Union
6
+ from FileStream.bot import WORK_LOADS
7
+ from pyrogram import Client, utils, raw
8
+ from .file_properties import get_file_ids
9
+ from pyrogram.session import Session, Auth
10
+ from pyrogram.errors import AuthBytesInvalid
11
+ from pyrogram.file_id import FileId, FileType, ThumbnailSource
12
+ from pyrogram.types import Message
13
+
14
+
15
+ class ByteStreamer:
16
+
17
+ def __init__(self, client: Client):
18
+ self.clean_timer = 30 * 60
19
+ self.client: Client = client
20
+ self.cached_file_ids: Dict[str, FileId] = {}
21
+ asyncio.create_task(self.clean_cache())
22
+
23
+ async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
24
+ """
25
+ Returns the properties of a media of a specific message in a FIleId class.
26
+ if the properties are cached, then it'll return the cached results.
27
+ or it'll generate the properties from the Message ID and cache them.
28
+ """
29
+ if not db_id in self.cached_file_ids:
30
+ logging.debug("Before Calling generate_file_properties")
31
+ await self.generate_file_properties(db_id, MULTI_CLIENTS)
32
+ logging.debug(f"Cached file properties for file with ID {db_id}")
33
+ return self.cached_file_ids[db_id]
34
+
35
+ async def generate_file_properties(self, db_id: str,
36
+ MULTI_CLIENTS) -> FileId:
37
+ """
38
+ Generates the properties of a media file on a specific message.
39
+ returns ths properties in a FIleId class.
40
+ """
41
+ logging.debug("Before calling get_file_ids")
42
+ file_id = await get_file_ids(self.client, db_id, Message)
43
+ logging.debug(f"Generated file ID and Unique ID for file with ID {db_id}")
44
+ self.cached_file_ids[db_id] = file_id
45
+ logging.debug(f"Cached media file with ID {db_id}")
46
+ return self.cached_file_ids[db_id]
47
+
48
+ async def generate_media_session(self, client: Client,
49
+ file_id: FileId) -> Session:
50
+ """
51
+ Generates the media session for the DC that contains the media file.
52
+ This is required for getting the bytes from Telegram servers.
53
+ """
54
+
55
+ media_session = client.media_sessions.get(file_id.dc_id, None)
56
+
57
+ if media_session is None:
58
+ if file_id.dc_id != await client.storage.dc_id():
59
+ media_session = Session(
60
+ client,
61
+ file_id.dc_id,
62
+ await Auth(client, file_id.dc_id, await
63
+ client.storage.test_mode()).create(),
64
+ await client.storage.test_mode(),
65
+ is_media=True,
66
+ )
67
+ await media_session.start()
68
+
69
+ for _ in range(6):
70
+ exported_auth = await client.invoke(
71
+ raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id))
72
+
73
+ try:
74
+ await media_session.invoke(
75
+ raw.functions.auth.ImportAuthorization(
76
+ id=exported_auth.id, bytes=exported_auth.bytes))
77
+ break
78
+ except AuthBytesInvalid:
79
+ logging.debug(
80
+ f"Invalid authorization bytes for DC {file_id.dc_id}")
81
+ continue
82
+ else:
83
+ await media_session.stop()
84
+ raise AuthBytesInvalid
85
+ else:
86
+ media_session = Session(
87
+ client,
88
+ file_id.dc_id,
89
+ await client.storage.auth_key(),
90
+ await client.storage.test_mode(),
91
+ is_media=True,
92
+ )
93
+ await media_session.start()
94
+ logging.debug(f"Created media session for DC {file_id.dc_id}")
95
+ client.media_sessions[file_id.dc_id] = media_session
96
+ else:
97
+ logging.debug(f"Using cached media session for DC {file_id.dc_id}")
98
+ return media_session
99
+
100
+ @staticmethod
101
+ async def get_location(
102
+ file_id: FileId
103
+ ) -> Union[
104
+ raw.types.InputPhotoFileLocation,
105
+ raw.types.InputDocumentFileLocation,
106
+ raw.types.InputPeerPhotoFileLocation,
107
+ ]:
108
+ """
109
+ Returns the file location for the media file.
110
+ """
111
+ file_type = file_id.file_type
112
+
113
+ if file_type == FileType.CHAT_PHOTO:
114
+ if file_id.chat_id > 0:
115
+ peer = raw.types.InputPeerUser(user_id=file_id.chat_id,
116
+ access_hash=file_id.chat_access_hash)
117
+ else:
118
+ if file_id.chat_access_hash == 0:
119
+ peer = raw.types.InputPeerChat(chat_id=-file_id.chat_id)
120
+ else:
121
+ peer = raw.types.InputPeerChannel(
122
+ channel_id=utils.get_channel_id(file_id.chat_id),
123
+ access_hash=file_id.chat_access_hash,
124
+ )
125
+
126
+ location = raw.types.InputPeerPhotoFileLocation(
127
+ peer=peer,
128
+ volume_id=file_id.volume_id,
129
+ local_id=file_id.local_id,
130
+ big=file_id.thumbnail_source == ThumbnailSource.CHAT_PHOTO_BIG,
131
+ )
132
+ elif file_type == FileType.PHOTO:
133
+ location = raw.types.InputPhotoFileLocation(
134
+ id=file_id.media_id,
135
+ access_hash=file_id.access_hash,
136
+ file_reference=file_id.file_reference,
137
+ thumb_size=file_id.thumbnail_size,
138
+ )
139
+ else:
140
+ location = raw.types.InputDocumentFileLocation(
141
+ id=file_id.media_id,
142
+ access_hash=file_id.access_hash,
143
+ file_reference=file_id.file_reference,
144
+ thumb_size=file_id.thumbnail_size,
145
+ )
146
+ return location
147
+
148
+ async def yield_file(
149
+ self,
150
+ file_id: FileId,
151
+ index: int,
152
+ offset: int,
153
+ first_part_cut: int,
154
+ last_part_cut: int,
155
+ part_count: int,
156
+ chunk_size: int,
157
+ ) -> Union[str, None]:
158
+
159
+ client = self.client
160
+ WORK_LOADS[index] += 1
161
+ logging.debug(f"Starting to yielding file with client {index}.")
162
+ media_session = await self.generate_media_session(client, file_id)
163
+
164
+ current_part = 1
165
+
166
+ location = await self.get_location(file_id)
167
+
168
+ try:
169
+ r = await media_session.invoke(
170
+ raw.functions.upload.GetFile(location=location,
171
+ offset=offset,
172
+ limit=chunk_size), )
173
+ if isinstance(r, raw.types.upload.File):
174
+ while True:
175
+ chunk = r.bytes
176
+ if not chunk:
177
+ break
178
+ elif part_count == 1:
179
+ yield chunk[first_part_cut:last_part_cut]
180
+ elif current_part == 1:
181
+ yield chunk[first_part_cut:]
182
+ elif current_part == part_count:
183
+ yield chunk[:last_part_cut]
184
+ else:
185
+ yield chunk
186
+
187
+ current_part += 1
188
+ offset += chunk_size
189
+
190
+ if current_part > part_count:
191
+ break
192
+
193
+ r = await media_session.invoke(
194
+ raw.functions.upload.GetFile(location=location,
195
+ offset=offset,
196
+ limit=chunk_size), )
197
+ except (TimeoutError, AttributeError):
198
+ pass
199
+ finally:
200
+ logging.debug(f"Finished yielding file with {current_part} parts.")
201
+ WORK_LOADS[index] -= 1
202
+
203
+ async def clean_cache(self) -> None:
204
+ """
205
+ function to clean the cache to reduce memory usage
206
+ """
207
+ while True:
208
+ await asyncio.sleep(self.clean_timer)
209
+ print("** Caches Cleared :", self.cached_file_ids)
210
+ self.cached_file_ids.clear()
211
+ logging.debug("Cleaned the cache")
Unused Codes/downloader_old.py ADDED
@@ -0,0 +1,100 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #Filestream/server/Functions/Downloader
2
+ import os
3
+ import time
4
+ import math
5
+ import logging
6
+ import asyncio
7
+ import traceback
8
+ from aiohttp import web
9
+ from pyrogram import raw
10
+
11
+ from aiohttp.http_exceptions import BadStatusLine
12
+
13
+ #---------------------Local Upload---------------------#
14
+
15
+ from FileStream.config import Telegram
16
+ from FileStream import utils, StartTime, __version__
17
+ from FileStream.Tools import mime_identifier, Time_ISTKolNow
18
+ from FileStream.bot import req_client, FileStream
19
+ from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS
20
+ from FileStream.server.exceptions import FIleNotFound, InvalidHash
21
+ from FileStream.server.render_template import render_page, render_upload
22
+ from FileStream.utils.FileProcessors.custom_ul import TeleUploader
23
+
24
+
25
+ async def media_streamer(request: web.Request, db_id: str, speed:str):
26
+
27
+ range_header = request.headers.get("Range", 0)
28
+ #index = minWORK_LOADS, keyWORK_LOADS.get)
29
+ #faster_client = MULTI_CLIENTS[index]
30
+
31
+ client = await req_client()
32
+
33
+ if Telegram.MULTI_CLIENT:
34
+ logging.info(f"Client {client['index']} is now serving {request.headers.get('X-FORWARDED-FOR',request.remote)}")
35
+
36
+ if client['client'] in ACTIVE_CLIENTS:
37
+ tg_connect = ACTIVE_CLIENTS[client['client']]
38
+ logging.debug(f"Using cached ByteStreamer object for client {client['index']}")
39
+
40
+ else:
41
+ logging.debug(f"Creating new ByteStreamer object for client {client['index']}")
42
+ tg_connect = utils.ByteStreamer(client['client'])
43
+ ACTIVE_CLIENTS[client['client']] = tg_connect
44
+
45
+ logging.debug("before calling get_file_properties")
46
+ file_id = await tg_connect.get_file_properties(db_id, MULTI_CLIENTS)
47
+ logging.debug("after calling get_file_properties")
48
+
49
+ file_size = file_id.file_size
50
+
51
+ if range_header:
52
+ from_bytes, until_bytes = range_header.replace("bytes=", "").split("-")
53
+ from_bytes = int(from_bytes)
54
+ until_bytes = int(until_bytes) if until_bytes else file_size - 1
55
+ else:
56
+ from_bytes = request.http_range.start or 0
57
+ until_bytes = (request.http_range.stop or file_size) - 1
58
+
59
+ if (until_bytes > file_size) or (from_bytes < 0) or (until_bytes < from_bytes):
60
+
61
+ return web.Response(
62
+ status=416,
63
+ body="416: Range not satisfiable",
64
+ headers={"Content-Range": f"bytes */{file_size}"},
65
+ )
66
+
67
+ chunk_size = 4 * 1024 * 1024 if speed == "FAST" else 512 * 1024
68
+
69
+ until_bytes = min(until_bytes, file_size - 1)
70
+
71
+ offset = from_bytes - (from_bytes % chunk_size)
72
+ first_part_cut = from_bytes - offset
73
+ last_part_cut = until_bytes % chunk_size + 1
74
+
75
+ req_length = until_bytes - from_bytes + 1
76
+ part_count = math.ceil(until_bytes / chunk_size) - math.floor(offset / chunk_size)
77
+
78
+ body = tg_connect.yield_file(file_id, client['index'], offset, first_part_cut,last_part_cut, part_count, chunk_size)
79
+
80
+ mime_type = file_id.mime_type
81
+ file_name = utils.get_name(file_id)
82
+ disposition = "attachment"
83
+
84
+ if not mime_type:
85
+ mime_type = mimetypes.guess_type(file_name)[0] or "application/octet-stream"
86
+
87
+ # if "video/" in mime_type or "audio/" in mime_type:
88
+ # disposition = "inline"
89
+
90
+ return web.Response(
91
+ status=206 if range_header else 200,
92
+ body=body,
93
+ headers={
94
+ "Content-Type": f"{mime_type}",
95
+ "Content-Range": f"bytes {from_bytes}-{until_bytes}/{file_size}",
96
+ "Content-Length": str(req_length),
97
+ "Content-Disposition": f'{disposition}; filename="{file_name}"',
98
+ "Accept-Ranges": "bytes",
99
+ },
100
+ )
Unused Codes/routes_api_old.py ADDED
@@ -0,0 +1,240 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #FileStream/server/routes_api
2
+
3
+ import os
4
+
5
+ import json
6
+ import time
7
+ import math
8
+ import logging
9
+ import asyncio
10
+ import traceback
11
+ import aiohttp_cors
12
+ from aiohttp import web
13
+ from pyrogram import raw
14
+ from bson import ObjectId
15
+ from bson.json_util import dumps
16
+ from aiohttp.http_exceptions import BadStatusLine
17
+
18
+ #---------------------Local Imports----------------------------------#
19
+ from FileStream.bot import req_client
20
+ from FileStream.config import Telegram,Server
21
+ from FileStream.Database import Database
22
+ from FileStream.TMDB.Endpoint import search_tmdb_any,search_tmdb_tv,search_tmdb_movies
23
+ from FileStream.server.exceptions import FIleNotFound, InvalidHash
24
+
25
+ from .Functions.downloader import media_streamer
26
+
27
+ CORS_HEADERS = {
28
+ "Access-Control-Allow-Origin": "*",
29
+ "Access-Control-Allow-Headers": "*"
30
+ }
31
+
32
+ async def handle_v2(request):
33
+ return web.Response(text="Hello from app api!")
34
+
35
+ #api.router.add_get('/10/files', list_10_all_files_db)
36
+ async def list_10_all_files_db(request):
37
+ #file_range = [file_list_no * 10 - 10 + 1, file_list_no * 10]
38
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
39
+ files, total_files = await db.find_all_files([1, 10])
40
+ print(files)
41
+ return web.json_response([ dict(file) async for file in files], headers=CORS_HEADERS)
42
+
43
+ #return file_list, total_files
44
+
45
+
46
+ #api.router.add_get('/files', list_all_files_db)
47
+ async def list_all_files_db(request):
48
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
49
+ files= await db.get_all_files_api()
50
+ #print(files, type(files))
51
+ return web.json_response(json.loads(dumps(files)), headers=CORS_HEADERS)
52
+
53
+ #api.router.add_get('/tmdb/mix', list_all_files_tmdb)
54
+ async def list_all_tmdb_movies_from_db(request):
55
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
56
+ files= await db.get_all_files()
57
+ #print(files)
58
+ response=[]
59
+ async for row in files:
60
+ #print(row['file']['caption'])
61
+ try :
62
+ #print("* Response",search_tmdb(row['file']['caption'] if row['file']['caption'] else row['file']['file_name']))
63
+ resp = search_tmdb_movies( str(row['file']['caption']) if str(row['file']['caption']) else str(row['file']['file_name']))
64
+ if resp != None :
65
+ #resp= dict(resp)
66
+ #print("TMDB Response :",resp)
67
+ response.append(resp)
68
+ else:
69
+ print("\n * Skipped:",row['file']['caption'],str(row['file']['file_name']))
70
+ continue
71
+ except Exception as e:
72
+ print("Error ",e)
73
+ break
74
+ return web.json_response(json.loads(dumps(response)),headers=CORS_HEADERS)
75
+
76
+
77
+ #api.router.add_get('/tmdb/tv', list_all_tmdb_tv_from_db)
78
+ async def list_all_tmdb_tv_from_db(request):
79
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
80
+ files= await db.get_all_files()
81
+ #print(files)
82
+ response=[]
83
+ async for row in files:
84
+ #print(row['file']['caption'])
85
+ try :
86
+ #print("* Response",search_tmdb(row['file']['caption'] if row['file']['caption'] else row['file']['file_name']))
87
+ resp = search_tmdb_tv( str(row['file']['caption']) if str(row['file']['caption']) else str(row['file']['file_name']))
88
+ if resp != None :
89
+ #resp= dict(resp)
90
+ #print("TMDB Response :",resp)
91
+ response.append(resp)
92
+ else:
93
+ print("\n * Skipped:",row['file']['caption'],str(row['file']['file_name']))
94
+ continue
95
+ except Exception as e:
96
+ print("Error ",e)
97
+ break
98
+ return web.json_response(json.loads(dumps(response)), headers=CORS_HEADERS)
99
+
100
+ #api.router.add_get('/tmdb/list', list_all_files_tmdb)
101
+ async def list_all_files_tmdb(request):
102
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
103
+ files= await db.get_all_files()
104
+ async def data_generator():
105
+ async for row in files:
106
+ #print(row['file']['caption'])
107
+ try :
108
+ #print("* Response",search_tmdb(row['file']['caption'] if row['file']['caption'] else row['file']['file_name']))
109
+ resp = search_tmdb_any( str(row['file']['caption']) if str(row['file']['caption']) else str(row['file']['file_name']))
110
+ if resp != None :
111
+ #resp= dict(resp)
112
+ #print("TMDB Response :",resp)
113
+ #response.append(resp)
114
+ yield json.dumps(resp) + + '\n'
115
+
116
+ else:
117
+ print("\n * Skipped:",row['file']['caption'],str(row['file']['file_name']))
118
+ continue
119
+
120
+ except Exception as e:
121
+ print("Error ",e)
122
+ break
123
+
124
+ return web.Response(body=data_generator(), content_type='application/json', headers=CORS_HEADERS)
125
+
126
+ #api.router.add_get('/tmdb/files', list_all_files)
127
+ async def list_all_files(request):
128
+ db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME)
129
+ files=await db.get_all_files_api()
130
+ resp=[{
131
+ "adult": False,
132
+ "backdrop_path": "/c1bz69r0v065TGFA5nqBiKzPDys.jpg",
133
+ "genre_ids": [
134
+ 35,
135
+ 10751,
136
+ 10402
137
+ ],
138
+ "id": f"{row['_id']}",
139
+ "original_language": "en-hi",
140
+ "original_title": f"{str(row['file']['caption'])}",
141
+ "overview": "XRepo Movies",
142
+ "popularity": 1710.176,
143
+ "poster_path": "/irIS5Tn3TXjNi1R9BpWvGAN4CZ1.jpg",
144
+ "release_date": "2022-10-07",
145
+ "title": f"{str(row['file']['caption'])}",
146
+ "link": f"{Server.URL}api/dl/{row['_id']}",
147
+ "vote_average": 7.8,
148
+ "vote_count": 122,
149
+ }
150
+ for row in files]
151
+ return web.json_response(json.loads(dumps(resp)), headers=CORS_HEADERS)
152
+
153
+ #----------------------------------------Upload----------------------------------------------#
154
+ #@routes.post("/upload")
155
+ async def upload_file(request: web.Request):
156
+
157
+ data = await request.post()
158
+ file = data.get('file').file
159
+ chunk = file.read()
160
+ """
161
+ user_id :
162
+ file_id :"BAACAgUAAxkBAAJBHGYI_aJSvyL_ijKwrVHQVzFgC1YZAAItDQACOnNAVMIpWwl6b63EHg…"
163
+ file_unique_id :"AgADLQ0AAjpzQFQ"
164
+ file_name :"Dadur_Kirti_S01_COMBINED_720p_HOICHOI_WEB_DL_Bengali@COOL_MOVIES.mp4"
165
+ file_size : 1354816011
166
+ mime_type : "video/mp4"
167
+ time : 1711865251.1016757
168
+ user_type: "TELEGRAM"/"WEB"
169
+ privacy_type: "PUBLIC"/"PRIVATE"
170
+ file_ids :
171
+ """
172
+ file_details = dict(user_id="thebinary1",
173
+ dropzone_id=str(data["dzuuid"]),
174
+ file=dict(file_id=str(data["dzuuid"]),
175
+ file_unique_id=str(data["dzuuid"]),
176
+ file_name=str(data.get('file').filename),
177
+ file_size=int(data["dztotalfilesize"]),
178
+ mime_type=mime_identifier(str(data.get('file').filename)),
179
+ part_size=int(data["dzchunksize"]),
180
+ file_part=int(data["dzchunkindex"]),
181
+ total_parts=int(data["dztotalchunkcount"])),
182
+ time=Time_ISTKolNow(),
183
+ user_type="WEB",
184
+ privacy_type="PRIVATE")
185
+
186
+ print(file_details) if (file_details["file"]["file_part"] == 0) else None
187
+ client_req = await req_client()
188
+ #faster_client = client_req["client"]
189
+ #index = client_req["index"]
190
+ print("using :", client_req["index"])
191
+ tg_connect = TeleUploader(client_req["client"])
192
+ main = await tg_connect.upload_web_file(file_details, chunk)
193
+ #print("Response:", main)
194
+ return web.json_response({
195
+ "status": main.get("status"),
196
+ "message": main.get("message")
197
+ })
198
+
199
+
200
+ #-------------Routes to Downloada File Witha Path-----------------#
201
+
202
+ #@routes.get("/dl/{path}", allow_head=True)
203
+ async def stream_handler(request: web.Request):
204
+ try:
205
+ path = request.match_info["path"]
206
+ return await media_streamer(request, path, "FAST")
207
+ except InvalidHash as e:
208
+ raise web.HTTPForbidden(text=e.message)
209
+ except FIleNotFound as e:
210
+ raise web.HTTPNotFound(text=e.message)
211
+ except (AttributeError, BadStatusLine, ConnectionResetError):
212
+ pass
213
+ except Exception as e:
214
+ traceback.print_exc()
215
+ logging.critical(e.with_traceback(None))
216
+ logging.debug(traceback.format_exc())
217
+ raise web.HTTPInternalServerError(text=str(e))
218
+
219
+
220
+ api = web.Application()
221
+
222
+ cors = aiohttp_cors.setup(api, defaults={
223
+ "*": aiohttp_cors.ResourceOptions(
224
+ allow_credentials=False,
225
+ expose_headers="*",
226
+ allow_headers="*",
227
+ allow_methods="*" #["GET"] Allowing specific methods
228
+ )
229
+ })
230
+
231
+ cors.add(api.router.add_get('/', handle_v2))
232
+ api.router.add_get('/files', list_all_files_db)
233
+ api.router.add_get('/files/mix', list_all_files)
234
+ api.router.add_get('/tmdb/mix', list_all_files_tmdb)
235
+ api.router.add_get('/10/files', list_10_all_files_db)
236
+ api.router.add_get('/tmdb/tv', list_all_tmdb_tv_from_db)
237
+ api.router.add_get('/tmdb/movies', list_all_tmdb_movies_from_db)
238
+
239
+ api.router.add_get('/upload', upload_file)
240
+ api.router.add_get('/dl/{path}', stream_handler)