understanding commited on
Commit
19fc4ac
Β·
verified Β·
1 Parent(s): ffdffe9

Update bot.py

Browse files
Files changed (1) hide show
  1. bot.py +70 -92
bot.py CHANGED
@@ -1,14 +1,14 @@
1
  # bot.py
 
2
  import asyncio
3
  import logging
4
  import os
5
  import re
6
  import uuid
7
- import time
8
  from typing import Dict
9
 
10
- from aiogram import Bot, Dispatcher, F
11
- from aiogram.filters import CommandStart, Command
12
  from aiogram.types import Message, FSInputFile
13
  from aiogram.enums import ParseMode
14
  from aiogram.exceptions import TelegramBadRequest
@@ -19,31 +19,32 @@ import terabox_utils as terabox
19
  import db_utils
20
 
21
  # --- Setup ---
22
-
23
  os.makedirs("logs", exist_ok=True)
24
  os.makedirs("downloads", exist_ok=True)
25
 
26
  logging.basicConfig(
27
  level=logging.INFO,
28
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
29
  handlers=[
30
  logging.StreamHandler(),
31
- logging.FileHandler("logs/bot.log", mode="a", encoding="utf-8"),
32
- ],
33
  )
34
- logger = logging.getLogger(__name__)
35
 
36
  bot = Bot(
37
  token=config.BOT_TOKEN,
38
- default=DefaultBotProperties(parse_mode=ParseMode.HTML),
 
 
 
39
  )
40
 
41
  dp = Dispatcher()
42
-
43
  TASK_QUEUE = asyncio.Queue()
44
  BATCH_JOBS: Dict[str, Dict] = {}
45
 
46
- # --- Worker ---
47
  async def link_processor_worker(worker_id: int):
48
  logger.info(f"Link worker #{worker_id} started.")
49
  while True:
@@ -56,7 +57,7 @@ async def link_processor_worker(worker_id: int):
56
  continue
57
 
58
  file_info = None
59
- error_msg = f"Unknown error for link: {original_link}"
60
 
61
  try:
62
  short_id = await terabox.extract_terabox_short_id(original_link)
@@ -66,7 +67,7 @@ async def link_processor_worker(worker_id: int):
66
  cached_file = await db_utils.get_cached_file(short_id)
67
  if cached_file:
68
  logger.info(f"Cache hit for {short_id}. Reusing file_id.")
69
- file_info = {"cached": True, **cached_file}
70
  else:
71
  download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link)
72
  if error:
@@ -77,18 +78,18 @@ async def link_processor_worker(worker_id: int):
77
  batch_info["source_chat_id"],
78
  batch_info["status_message"].message_id,
79
  download_url,
80
- raw_filename,
81
  )
82
  if download_error:
83
  raise ValueError(download_error)
84
 
85
  file_info = {
86
- "cached": False,
87
- "path": local_filepath,
88
- "name": raw_filename,
89
- "size": os.path.getsize(local_filepath),
90
- "short_id": short_id,
91
- "thumb": thumb_path,
92
  }
93
 
94
  except Exception as e:
@@ -101,12 +102,10 @@ async def link_processor_worker(worker_id: int):
101
  else:
102
  batch_info["failed_links"].append({"link": original_link, "error": error_msg})
103
 
104
- processed, total = batch_info["processed_links"], batch_info["total_links"]
105
-
106
  try:
107
  await batch_info["status_message"].edit_text(
108
- f"βš™οΈ Batch `{batch_id[:6]}` progress: {processed}/{total} links processed.\n"
109
- f"ETA: {(total - processed) * 3} sec ⏳"
110
  )
111
  except TelegramBadRequest:
112
  pass
@@ -117,35 +116,36 @@ async def link_processor_worker(worker_id: int):
117
 
118
  TASK_QUEUE.task_done()
119
 
120
- # --- Send & Cache ---
121
  async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message:
122
- file_path_or_id = file_info.get("file_id") or FSInputFile(file_info["path"], filename=file_info["name"])
123
- media_type = file_info.get("type")
124
- filename = file_info.get("name") or file_info.get("filename")
125
 
126
- video_exts = (".mp4", ".mkv", ".mov", ".avi", ".webm")
127
- audio_exts = (".mp3", ".flac", ".ogg", ".wav")
128
 
129
  sent_message = None
130
- if media_type == "video" or filename.lower().endswith(video_exts):
131
- sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
132
- media_type = "video"
133
- elif media_type == "audio" or filename.lower().endswith(audio_exts):
 
 
 
134
  sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption)
135
- media_type = "audio"
136
  else:
137
  sent_message = await bot.send_document(chat_id, file_path_or_id, caption=caption)
138
- media_type = "document"
139
 
140
- if not file_info.get("cached") and sent_message:
141
  file_id_to_cache = getattr(sent_message, media_type).file_id
142
  await db_utils.add_to_cache(
143
- file_info["short_id"], file_id_to_cache, filename, media_type, file_info["size"]
144
  )
145
 
146
  return sent_message
147
 
148
- # --- Batch Completion ---
149
  async def handle_batch_completion(batch_id: str):
150
  batch = BATCH_JOBS.get(batch_id)
151
  if not batch:
@@ -156,23 +156,25 @@ async def handle_batch_completion(batch_id: str):
156
 
157
  try:
158
  if not successful_downloads:
159
- failed_links_text = "\n".join([f"- {x['link']} β†’ {x['error']}" for x in batch["failed_links"]])
 
 
160
  await status_msg.edit_text(
161
- f"❌ Batch `{batch_id[:6]}` failed.\nDetails:\n{failed_links_text}"
162
  )
163
  return
164
 
165
  await status_msg.edit_text(
166
- f"βœ… Batch `{batch_id[:6]}` downloaded. Sending {len(successful_downloads)} files..."
167
  )
168
 
 
169
  if config.FORWARD_CHANNEL_ID:
170
  await bot.forward_message(
171
  config.FORWARD_CHANNEL_ID,
172
  batch["source_chat_id"],
173
- batch["source_message_id"],
174
  )
175
-
176
  for item in successful_downloads:
177
  caption = f"`{item.get('name') or item.get('filename')}`"
178
  await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption)
@@ -195,70 +197,48 @@ async def handle_batch_completion(batch_id: str):
195
  )
196
  finally:
197
  for item in successful_downloads:
198
- if not item.get("cached") and os.path.exists(item["path"]):
199
- os.remove(item["path"])
200
- if item.get("thumb") and os.path.exists(item["thumb"]):
201
- os.remove(item["thumb"])
202
 
203
  del BATCH_JOBS[batch_id]
204
 
205
  # --- Handlers ---
206
  @dp.message(CommandStart())
207
  async def start_handler(message: Message):
208
- await message.reply(
209
- "πŸ‘‹ <b>Welcome to Terabox Bot!</b>\n\n"
210
- "πŸ“₯ Send me any <b>Terabox</b> links.\n"
211
- "βœ… I will download and send the files to you.\n\n"
212
- "ℹ️ Use /status to check current batch progress.\n\n"
213
- "Happy sharing!"
 
214
  )
215
-
216
- @dp.message(Command("status"))
217
- async def status_handler(message: Message):
218
- active_batches = []
219
- for batch_id, batch in BATCH_JOBS.items():
220
- processed = batch["processed_links"]
221
- total = batch["total_links"]
222
- active_batches.append(f"Batch `{batch_id[:6]}`: {processed}/{total}")
223
-
224
- if not active_batches:
225
- await message.reply("βœ… No active batches. All clear.")
226
- else:
227
- await message.reply("πŸ“Š Active batches:\n" + "\n".join(active_batches))
228
 
229
  @dp.message(F.text | F.caption)
230
  async def message_handler(message: Message):
231
- if message.text and message.text.startswith("/"):
232
  return
233
 
234
  await db_utils.add_or_update_user_db(
235
  message.from_user.id,
236
  message.from_user.username,
237
- message.from_user.first_name,
238
  )
239
 
240
- links = list(set(re.findall(r"https?://[^\s<>'\"]+", message.text or message.caption or "")))
241
- terabox_links = [
242
- link
243
- for link in links
244
- if any(x in link for x in [
245
- "terabox.com",
246
- "teraboxapp.com",
247
- "terasharelink.com",
248
- "1024tera.com",
249
- "freeterabox.com",
250
- "4funbox.com",
251
- "box-links.com",
252
- ])
253
- ]
254
 
255
  if not terabox_links:
256
  return
257
 
258
  batch_id = str(uuid.uuid4())
259
- status_msg = await message.reply(
260
- f"βœ… Found {len(terabox_links)} links. Queued as batch `{batch_id[:6]}`.\nProcessing... ⏳"
261
- )
262
 
263
  BATCH_JOBS[batch_id] = {
264
  "total_links": len(terabox_links),
@@ -269,7 +249,7 @@ async def message_handler(message: Message):
269
  "source_user_id": message.from_user.id,
270
  "source_message_id": message.message_id,
271
  "status_message": status_msg,
272
- "lock": asyncio.Lock(),
273
  }
274
 
275
  for link in terabox_links:
@@ -282,10 +262,8 @@ async def on_startup(dispatcher: Dispatcher):
282
  asyncio.create_task(link_processor_worker(i + 1))
283
  logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.")
284
 
285
- # --- Main ---
286
- def setup_dispatcher():
287
  dp.startup.register(on_startup)
288
- dp.message.register(start_handler, CommandStart())
289
- dp.message.register(status_handler, Command("status"))
290
  dp.message.register(message_handler)
291
- return dp
 
 
1
  # bot.py
2
+
3
  import asyncio
4
  import logging
5
  import os
6
  import re
7
  import uuid
 
8
  from typing import Dict
9
 
10
+ from aiogram import Bot, Dispatcher, types, F
11
+ from aiogram.filters import CommandStart
12
  from aiogram.types import Message, FSInputFile
13
  from aiogram.enums import ParseMode
14
  from aiogram.exceptions import TelegramBadRequest
 
19
  import db_utils
20
 
21
  # --- Setup ---
 
22
  os.makedirs("logs", exist_ok=True)
23
  os.makedirs("downloads", exist_ok=True)
24
 
25
  logging.basicConfig(
26
  level=logging.INFO,
27
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
28
  handlers=[
29
  logging.StreamHandler(),
30
+ logging.FileHandler("logs/bot.log", mode='a', encoding='utf-8')
31
+ ]
32
  )
33
+ logger = logging.getLogger("bot")
34
 
35
  bot = Bot(
36
  token=config.BOT_TOKEN,
37
+ default=DefaultBotProperties(
38
+ parse_mode=ParseMode.HTML,
39
+ link_preview_is_disabled=True
40
+ )
41
  )
42
 
43
  dp = Dispatcher()
 
44
  TASK_QUEUE = asyncio.Queue()
45
  BATCH_JOBS: Dict[str, Dict] = {}
46
 
47
+ # --- Worker Logic ---
48
  async def link_processor_worker(worker_id: int):
49
  logger.info(f"Link worker #{worker_id} started.")
50
  while True:
 
57
  continue
58
 
59
  file_info = None
60
+ error_msg = f"An unknown error occurred for link: {original_link}"
61
 
62
  try:
63
  short_id = await terabox.extract_terabox_short_id(original_link)
 
67
  cached_file = await db_utils.get_cached_file(short_id)
68
  if cached_file:
69
  logger.info(f"Cache hit for {short_id}. Reusing file_id.")
70
+ file_info = {'cached': True, 'short_id': short_id, **cached_file}
71
  else:
72
  download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link)
73
  if error:
 
78
  batch_info["source_chat_id"],
79
  batch_info["status_message"].message_id,
80
  download_url,
81
+ raw_filename
82
  )
83
  if download_error:
84
  raise ValueError(download_error)
85
 
86
  file_info = {
87
+ 'cached': False,
88
+ 'path': local_filepath,
89
+ 'name': raw_filename,
90
+ 'size': os.path.getsize(local_filepath),
91
+ 'short_id': short_id,
92
+ 'thumb': thumb_path
93
  }
94
 
95
  except Exception as e:
 
102
  else:
103
  batch_info["failed_links"].append({"link": original_link, "error": error_msg})
104
 
105
+ processed, total = batch_info['processed_links'], batch_info['total_links']
 
106
  try:
107
  await batch_info["status_message"].edit_text(
108
+ f"βš™οΈ Batch `{batch_id[:6]}` in progress... Processed {processed}/{total} links."
 
109
  )
110
  except TelegramBadRequest:
111
  pass
 
116
 
117
  TASK_QUEUE.task_done()
118
 
 
119
  async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message:
120
+ file_path_or_id = file_info.get('file_id') or FSInputFile(file_info['path'], filename=file_info['name'])
121
+ media_type = file_info.get('type')
122
+ filename = file_info.get('name') or file_info.get('filename')
123
 
124
+ video_exts = ('.mp4', '.mkv', '.mov', '.avi', '.webm')
125
+ audio_exts = ('.mp3', '.flac', '.ogg', '.wav')
126
 
127
  sent_message = None
128
+ if media_type == 'video' or filename.lower().endswith(video_exts):
129
+ if file_info.get('thumb') and os.path.exists(file_info['thumb']):
130
+ sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
131
+ else:
132
+ sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
133
+ media_type = 'video'
134
+ elif media_type == 'audio' or filename.lower().endswith(audio_exts):
135
  sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption)
136
+ media_type = 'audio'
137
  else:
138
  sent_message = await bot.send_document(chat_id, file_path_or_id, caption=caption)
139
+ media_type = 'document'
140
 
141
+ if not file_info.get('cached') and sent_message:
142
  file_id_to_cache = getattr(sent_message, media_type).file_id
143
  await db_utils.add_to_cache(
144
+ file_info['short_id'], file_id_to_cache, filename, media_type, file_info['size']
145
  )
146
 
147
  return sent_message
148
 
 
149
  async def handle_batch_completion(batch_id: str):
150
  batch = BATCH_JOBS.get(batch_id)
151
  if not batch:
 
156
 
157
  try:
158
  if not successful_downloads:
159
+ failed_links_text = "\n".join(
160
+ [f"- {x['link']} β†’ {x['error']}" for x in batch['failed_links']]
161
+ )
162
  await status_msg.edit_text(
163
+ f"❌ Batch `{batch_id[:6]}` failed. No files could be processed.\nDetails:\n{failed_links_text}"
164
  )
165
  return
166
 
167
  await status_msg.edit_text(
168
+ f"βœ… Batch `{batch_id[:6]}` downloaded. Preparing to send {len(successful_downloads)} files..."
169
  )
170
 
171
+ # Forward original message if enabled
172
  if config.FORWARD_CHANNEL_ID:
173
  await bot.forward_message(
174
  config.FORWARD_CHANNEL_ID,
175
  batch["source_chat_id"],
176
+ batch["source_message_id"]
177
  )
 
178
  for item in successful_downloads:
179
  caption = f"`{item.get('name') or item.get('filename')}`"
180
  await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption)
 
197
  )
198
  finally:
199
  for item in successful_downloads:
200
+ if not item.get('cached') and os.path.exists(item['path']):
201
+ os.remove(item['path'])
202
+ if item.get('thumb') and os.path.exists(item['thumb']):
203
+ os.remove(item['thumb'])
204
 
205
  del BATCH_JOBS[batch_id]
206
 
207
  # --- Handlers ---
208
  @dp.message(CommandStart())
209
  async def start_handler(message: Message):
210
+ text = (
211
+ "πŸ‘‹ <b>Welcome to the Terabox Downloader Bot!</b>\n\n"
212
+ "πŸ“₯ Send me any valid Terabox link and I will fetch the file and send it to you.\n\n"
213
+ f"πŸ“’ Please make sure you are a member of: @{config.FORCE_SUB_CHANNEL_USERNAME}\n\n"
214
+ "πŸš€ Supports batch links & auto-caching.\n"
215
+ "πŸ’Ύ Fast & lightweight.\n\n"
216
+ "βœ… <i>Just send your link below ⬇️</i>"
217
  )
218
+ await message.reply(text)
 
 
 
 
 
 
 
 
 
 
 
 
219
 
220
  @dp.message(F.text | F.caption)
221
  async def message_handler(message: Message):
222
+ if message.text and message.text.startswith('/'):
223
  return
224
 
225
  await db_utils.add_or_update_user_db(
226
  message.from_user.id,
227
  message.from_user.username,
228
+ message.from_user.first_name
229
  )
230
 
231
+ links = list(set(re.findall(r'https?://[^\s<>"\']+', message.text or message.caption or "")))
232
+ terabox_links = [link for link in links if any(domain in link for domain in [
233
+ "terabox.com", "teraboxapp.com", "terasharelink.com", "1024tera.com",
234
+ "freeterabox.com", "4funbox.com", "box-links.com"
235
+ ])]
 
 
 
 
 
 
 
 
 
236
 
237
  if not terabox_links:
238
  return
239
 
240
  batch_id = str(uuid.uuid4())
241
+ status_msg = await message.reply(f"βœ… Found {len(terabox_links)} links. Queued as batch `{batch_id[:6]}`.")
 
 
242
 
243
  BATCH_JOBS[batch_id] = {
244
  "total_links": len(terabox_links),
 
249
  "source_user_id": message.from_user.id,
250
  "source_message_id": message.message_id,
251
  "status_message": status_msg,
252
+ "lock": asyncio.Lock()
253
  }
254
 
255
  for link in terabox_links:
 
262
  asyncio.create_task(link_processor_worker(i + 1))
263
  logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.")
264
 
265
+ def start_bot():
 
266
  dp.startup.register(on_startup)
 
 
267
  dp.message.register(message_handler)
268
+ dp.message.register(start_handler, CommandStart())
269
+ return dp, bot