understanding commited on
Commit
debb592
Β·
verified Β·
1 Parent(s): e09e530

Update bot.py

Browse files
Files changed (1) hide show
  1. bot.py +57 -67
bot.py CHANGED
@@ -4,8 +4,7 @@ import logging
4
  import os
5
  import re
6
  import uuid
7
- import time
8
- from typing import Dict
9
 
10
  from aiogram import Bot, Dispatcher, types, F
11
  from aiogram.filters import CommandStart
@@ -19,6 +18,7 @@ import terabox_utils as terabox
19
  import db_utils
20
 
21
  # --- Setup ---
 
22
  os.makedirs("logs", exist_ok=True)
23
  os.makedirs("downloads", exist_ok=True)
24
 
@@ -43,13 +43,8 @@ bot = Bot(
43
  dp = Dispatcher()
44
  TASK_QUEUE = asyncio.Queue()
45
  BATCH_JOBS: Dict[str, Dict] = {}
46
- START_TIME = time.time()
47
-
48
- # --- Utils ---
49
- def get_uptime():
50
- return round(time.time() - START_TIME)
51
 
52
- # --- Worker ---
53
  async def link_processor_worker(worker_id: int):
54
  logger.info(f"Link worker #{worker_id} started.")
55
  while True:
@@ -62,7 +57,7 @@ async def link_processor_worker(worker_id: int):
62
  continue
63
 
64
  file_info = None
65
- error_msg = f"Unknown error for link: {original_link}"
66
 
67
  try:
68
  short_id = await terabox.extract_terabox_short_id(original_link)
@@ -71,8 +66,8 @@ async def link_processor_worker(worker_id: int):
71
 
72
  cached_file = await db_utils.get_cached_file(short_id)
73
  if cached_file:
74
- logger.info(f"Cache hit for {short_id}.")
75
- file_info = {'cached': True, **cached_file}
76
  else:
77
  download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link)
78
  if error:
@@ -107,50 +102,36 @@ async def link_processor_worker(worker_id: int):
107
  else:
108
  batch_info["failed_links"].append({"link": original_link, "error": error_msg})
109
 
110
- processed = batch_info['processed_links']
111
- total = batch_info['total_links']
112
- eta = estimate_eta(batch_info)
113
-
114
  try:
115
  await batch_info["status_message"].edit_text(
116
- f"βš™οΈ Batch `{batch_id[:6]}` processing... {processed}/{total} links.\nETA: {eta}s"
 
 
 
117
  )
118
  except TelegramBadRequest:
119
  pass
120
 
121
  if processed == total:
122
- logger.info(f"Batch {batch_id[:6]} complete.")
123
  await handle_batch_completion(batch_id)
124
 
125
  TASK_QUEUE.task_done()
126
 
127
- # --- ETA ---
128
- def estimate_eta(batch_info):
129
- elapsed = time.time() - batch_info["start_time"]
130
- processed = batch_info["processed_links"]
131
- total = batch_info["total_links"]
132
-
133
- if processed == 0:
134
- return "calculating..."
135
-
136
- avg_time_per_link = elapsed / processed
137
- remaining_links = total - processed
138
- eta = int(avg_time_per_link * remaining_links)
139
- return eta
140
-
141
- # --- Send file ---
142
  async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message:
143
  file_path_or_id = file_info.get('file_id') or FSInputFile(file_info['path'], filename=file_info['name'])
 
144
  filename = file_info.get('name') or file_info.get('filename')
145
- sent_message = None
146
 
147
  video_exts = ('.mp4', '.mkv', '.mov', '.avi', '.webm')
148
  audio_exts = ('.mp3', '.flac', '.ogg', '.wav')
149
 
150
- if filename.lower().endswith(video_exts):
 
151
  sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
152
  media_type = 'video'
153
- elif filename.lower().endswith(audio_exts):
154
  sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption)
155
  media_type = 'audio'
156
  else:
@@ -165,7 +146,6 @@ async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Me
165
 
166
  return sent_message
167
 
168
- # --- Batch complete ---
169
  async def handle_batch_completion(batch_id: str):
170
  batch = BATCH_JOBS.get(batch_id)
171
  if not batch:
@@ -180,12 +160,12 @@ async def handle_batch_completion(batch_id: str):
180
  [f"- {x['link']} β†’ {x['error']}" for x in batch['failed_links']]
181
  )
182
  await status_msg.edit_text(
183
- f"❌ Batch `{batch_id[:6]}` failed. No files processed.\nDetails:\n{failed_links_text}"
184
  )
185
  return
186
 
187
  await status_msg.edit_text(
188
- f"βœ… Batch `{batch_id[:6]}` downloaded. Sending {len(successful_downloads)} files..."
189
  )
190
 
191
  if config.FORWARD_CHANNEL_ID:
@@ -195,24 +175,23 @@ async def handle_batch_completion(batch_id: str):
195
  batch["source_message_id"]
196
  )
197
  for item in successful_downloads:
198
- caption = f"`{item.get('name') or item.get('filename')}`"
199
  await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption)
200
  await asyncio.sleep(1)
201
 
202
  for item in successful_downloads:
203
- caption = f"`{item.get('name') or item.get('filename')}`"
204
  await send_and_cache_file(batch["source_chat_id"], item, caption)
205
 
206
- summary = f"βœ… Batch `{batch_id[:6]}` complete: {len(successful_downloads)} files sent."
207
- if batch["failed_links"]:
208
- summary += f"\n❌ {len(batch['failed_links'])} links failed."
209
-
210
  await status_msg.edit_text(summary)
211
 
212
  except Exception as e:
213
- logger.error(f"Error in batch {batch_id}: {e}", exc_info=True)
214
  await status_msg.edit_text(
215
- f"A critical error occurred while sending files for batch `{batch_id[:6]}`."
216
  )
217
  finally:
218
  for item in successful_downloads:
@@ -223,7 +202,6 @@ async def handle_batch_completion(batch_id: str):
223
 
224
  del BATCH_JOBS[batch_id]
225
 
226
- # --- Message ---
227
  @dp.message(F.text | F.caption)
228
  async def message_handler(message: Message):
229
  if message.text and message.text.startswith('/'):
@@ -236,15 +214,20 @@ async def message_handler(message: Message):
236
  )
237
 
238
  links = list(set(re.findall(r'https?://[^\s<>"\']+', message.text or message.caption or "")))
239
- terabox_links = [link for link in links if any(domain in link for domain in [
240
- "terabox.com", "teraboxapp.com", "terasharelink.com", "1024tera.com", "freeterabox.com", "4funbox.com", "box-links.com"
241
- ])]
 
 
 
242
 
243
  if not terabox_links:
244
  return
245
 
246
  batch_id = str(uuid.uuid4())
247
- status_msg = await message.reply(f"βœ… Found {len(terabox_links)} links. Queued as batch `{batch_id[:6]}`.")
 
 
248
 
249
  BATCH_JOBS[batch_id] = {
250
  "total_links": len(terabox_links),
@@ -255,33 +238,40 @@ async def message_handler(message: Message):
255
  "source_user_id": message.from_user.id,
256
  "source_message_id": message.message_id,
257
  "status_message": status_msg,
258
- "lock": asyncio.Lock(),
259
- "start_time": time.time()
260
  }
261
 
262
  for link in terabox_links:
263
  await TASK_QUEUE.put({"batch_id": batch_id, "original_link": link})
264
 
265
- # --- Startup ---
 
 
 
 
 
 
 
 
 
 
 
266
  async def on_startup(dispatcher: Dispatcher):
267
  await db_utils.initialize_database()
268
  for i in range(config.CONCURRENT_WORKERS):
269
  asyncio.create_task(link_processor_worker(i + 1))
270
- logger.info(f"Bot started with {config.CONCURRENT_WORKERS} workers.")
271
-
272
- @dp.message(CommandStart())
273
- async def start_handler(message: Message):
274
- text = (
275
- "πŸ‘‹ Welcome to <b>Terabox Downloader Bot</b>!\n\n"
276
- "βœ… Send me <b>Terabox links</b>, I will download and send you the files.\n"
277
- "βœ… Files also forwarded to our channel.\n"
278
- "βœ… Progress + ETA shown.\n\n"
279
- f"Bot uptime: {get_uptime()} sec πŸš€"
280
- )
281
- await message.reply(text)
282
 
283
- async def run_bot():
284
  dp.startup.register(on_startup)
285
  dp.message.register(message_handler)
286
  dp.message.register(start_handler, CommandStart())
287
  await dp.start_polling(bot)
 
 
 
 
 
 
 
 
 
4
  import os
5
  import re
6
  import uuid
7
+ from typing import List, Dict
 
8
 
9
  from aiogram import Bot, Dispatcher, types, F
10
  from aiogram.filters import CommandStart
 
18
  import db_utils
19
 
20
  # --- Setup ---
21
+
22
  os.makedirs("logs", exist_ok=True)
23
  os.makedirs("downloads", exist_ok=True)
24
 
 
43
  dp = Dispatcher()
44
  TASK_QUEUE = asyncio.Queue()
45
  BATCH_JOBS: Dict[str, Dict] = {}
 
 
 
 
 
46
 
47
+ # --- Worker & Batch Logic ---
48
  async def link_processor_worker(worker_id: int):
49
  logger.info(f"Link worker #{worker_id} started.")
50
  while True:
 
57
  continue
58
 
59
  file_info = None
60
+ error_msg = f"An unknown error occurred for link: {original_link}"
61
 
62
  try:
63
  short_id = await terabox.extract_terabox_short_id(original_link)
 
66
 
67
  cached_file = await db_utils.get_cached_file(short_id)
68
  if cached_file:
69
+ logger.info(f"Cache hit for {short_id}. Reusing file_id.")
70
+ file_info = {'cached': True, **cached_file, 'short_id': short_id}
71
  else:
72
  download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link)
73
  if error:
 
102
  else:
103
  batch_info["failed_links"].append({"link": original_link, "error": error_msg})
104
 
105
+ processed, total = batch_info['processed_links'], batch_info['total_links']
 
 
 
106
  try:
107
  await batch_info["status_message"].edit_text(
108
+ f"<b>βš™οΈ Batch <code>{batch_id[:6]}</code> in progress...</b>\n"
109
+ f"Processed: <b>{processed}/{total}</b>\n"
110
+ f"βœ… Success: <b>{len(batch_info['successful_downloads'])}</b>\n"
111
+ f"❌ Failed: <b>{len(batch_info['failed_links'])}</b>"
112
  )
113
  except TelegramBadRequest:
114
  pass
115
 
116
  if processed == total:
117
+ logger.info(f"Batch {batch_id[:6]} complete. Triggering final processing.")
118
  await handle_batch_completion(batch_id)
119
 
120
  TASK_QUEUE.task_done()
121
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message:
123
  file_path_or_id = file_info.get('file_id') or FSInputFile(file_info['path'], filename=file_info['name'])
124
+ media_type = file_info.get('type')
125
  filename = file_info.get('name') or file_info.get('filename')
 
126
 
127
  video_exts = ('.mp4', '.mkv', '.mov', '.avi', '.webm')
128
  audio_exts = ('.mp3', '.flac', '.ogg', '.wav')
129
 
130
+ sent_message = None
131
+ if media_type == 'video' or filename.lower().endswith(video_exts):
132
  sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
133
  media_type = 'video'
134
+ elif media_type == 'audio' or filename.lower().endswith(audio_exts):
135
  sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption)
136
  media_type = 'audio'
137
  else:
 
146
 
147
  return sent_message
148
 
 
149
  async def handle_batch_completion(batch_id: str):
150
  batch = BATCH_JOBS.get(batch_id)
151
  if not batch:
 
160
  [f"- {x['link']} β†’ {x['error']}" for x in batch['failed_links']]
161
  )
162
  await status_msg.edit_text(
163
+ f"❌ Batch <code>{batch_id[:6]}</code> failed. No files could be processed.\n\nDetails:\n{failed_links_text}"
164
  )
165
  return
166
 
167
  await status_msg.edit_text(
168
+ f"βœ… Batch <code>{batch_id[:6]}</code> downloaded. Sending <b>{len(successful_downloads)}</b> files..."
169
  )
170
 
171
  if config.FORWARD_CHANNEL_ID:
 
175
  batch["source_message_id"]
176
  )
177
  for item in successful_downloads:
178
+ caption = f"<code>{item.get('name') or item.get('filename')}</code>"
179
  await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption)
180
  await asyncio.sleep(1)
181
 
182
  for item in successful_downloads:
183
+ caption = f"<code>{item.get('name') or item.get('filename')}</code>"
184
  await send_and_cache_file(batch["source_chat_id"], item, caption)
185
 
186
+ summary = f"βœ… Batch <code>{batch_id[:6]}</code> complete.\n\n" \
187
+ f"πŸ“ Files sent: <b>{len(successful_downloads)}</b>\n" \
188
+ f"❌ Failed: <b>{len(batch['failed_links'])}</b>"
 
189
  await status_msg.edit_text(summary)
190
 
191
  except Exception as e:
192
+ logger.error(f"Error during batch completion for {batch_id}: {e}", exc_info=True)
193
  await status_msg.edit_text(
194
+ f"A critical error occurred while sending files for batch <code>{batch_id[:6]}</code>."
195
  )
196
  finally:
197
  for item in successful_downloads:
 
202
 
203
  del BATCH_JOBS[batch_id]
204
 
 
205
  @dp.message(F.text | F.caption)
206
  async def message_handler(message: Message):
207
  if message.text and message.text.startswith('/'):
 
214
  )
215
 
216
  links = list(set(re.findall(r'https?://[^\s<>"\']+', message.text or message.caption or "")))
217
+ terabox_links = [
218
+ link for link in links if any(domain in link for domain in [
219
+ "terabox.com", "teraboxapp.com", "terasharelink.com", "1024tera.com",
220
+ "freeterabox.com", "4funbox.com", "box-links.com"
221
+ ])
222
+ ]
223
 
224
  if not terabox_links:
225
  return
226
 
227
  batch_id = str(uuid.uuid4())
228
+ status_msg = await message.reply(
229
+ f"βœ… <b>Found {len(terabox_links)} Terabox links</b>.\nBatch ID: <code>{batch_id[:6]}</code>\nProcessing..."
230
+ )
231
 
232
  BATCH_JOBS[batch_id] = {
233
  "total_links": len(terabox_links),
 
238
  "source_user_id": message.from_user.id,
239
  "source_message_id": message.message_id,
240
  "status_message": status_msg,
241
+ "lock": asyncio.Lock()
 
242
  }
243
 
244
  for link in terabox_links:
245
  await TASK_QUEUE.put({"batch_id": batch_id, "original_link": link})
246
 
247
+ @dp.message(CommandStart())
248
+ async def start_handler(message: Message):
249
+ await message.reply(
250
+ "πŸ‘‹ <b>Welcome to the Terabox Bot!</b>\n\n"
251
+ "Send one or more <b>Terabox links</b> and I will fetch and forward them to you.\n\n"
252
+ "βœ… Fast download\n"
253
+ "βœ… Caching supported\n"
254
+ "βœ… Forward to channel\n\n"
255
+ "Enjoy! πŸš€"
256
+ )
257
+
258
+ # --- Startup & Main ---
259
  async def on_startup(dispatcher: Dispatcher):
260
  await db_utils.initialize_database()
261
  for i in range(config.CONCURRENT_WORKERS):
262
  asyncio.create_task(link_processor_worker(i + 1))
263
+ logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.")
 
 
 
 
 
 
 
 
 
 
 
264
 
265
+ async def main():
266
  dp.startup.register(on_startup)
267
  dp.message.register(message_handler)
268
  dp.message.register(start_handler, CommandStart())
269
  await dp.start_polling(bot)
270
+
271
+ # For main.py
272
+ async def main_loop_task():
273
+ await main()
274
+
275
+ # Run standalone
276
+ if __name__ == '__main__':
277
+ asyncio.run(main())