understanding commited on
Commit
bc7bf12
Β·
verified Β·
1 Parent(s): 41fd54e

Update bot.py

Browse files
Files changed (1) hide show
  1. bot.py +179 -239
bot.py CHANGED
@@ -1,272 +1,212 @@
1
- # bot.py
2
-
3
- import asyncio
4
- import logging
5
- import os
6
- import re
7
- import uuid
8
- from typing import Dict
9
-
10
- from aiogram import Bot, Dispatcher, types, F
11
- from aiogram.filters import CommandStart
12
- from aiogram.types import Message, FSInputFile
13
- from aiogram.enums import ParseMode
14
- from aiogram.exceptions import TelegramBadRequest
15
- from aiogram.client.default import DefaultBotProperties
16
-
17
- import config
18
- import terabox_utils as terabox
19
- import db_utils
20
-
21
- # --- Setup ---
22
- os.makedirs("logs", exist_ok=True)
23
- os.makedirs("downloads", exist_ok=True)
24
-
25
- logging.basicConfig(
26
- level=logging.INFO,
27
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
28
- handlers=[
29
- logging.StreamHandler(),
30
- logging.FileHandler("logs/bot.log", mode='a', encoding='utf-8')
31
- ]
32
- )
33
- logger = logging.getLogger("bot")
34
 
35
- bot = Bot(
36
- token=config.BOT_TOKEN,
37
- default=DefaultBotProperties(
38
- parse_mode=ParseMode.HTML,
39
- link_preview_is_disabled=True
40
- )
41
- )
42
 
43
- dp = Dispatcher()
44
- TASK_QUEUE = asyncio.Queue()
45
- BATCH_JOBS: Dict[str, Dict] = {}
46
 
47
- # --- Worker Logic ---
48
- async def link_processor_worker(worker_id: int):
49
- logger.info(f"Link worker #{worker_id} started.")
50
- while True:
51
- task = await TASK_QUEUE.get()
52
- batch_id, original_link = task["batch_id"], task["original_link"]
53
- batch_info = BATCH_JOBS.get(batch_id)
54
 
55
- if not batch_info:
56
- TASK_QUEUE.task_done()
57
- continue
58
 
59
- file_info = None
60
- error_msg = f"An unknown error occurred for link: {original_link}"
61
 
62
- try:
63
- short_id = await terabox.extract_terabox_short_id(original_link)
64
- if not short_id:
65
- raise ValueError("Invalid Terabox link format.")
66
-
67
- cached_file = await db_utils.get_cached_file(short_id)
68
- if cached_file:
69
- logger.info(f"Cache hit for {short_id}. Reusing file_id.")
70
- file_info = {'cached': True, 'short_id': short_id, **cached_file}
71
- else:
72
- download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link)
73
- if error:
74
- raise ValueError(error)
75
-
76
- local_filepath, thumb_path, download_error = await terabox.download_terabox_file(
77
- bot,
78
- batch_info["source_chat_id"],
79
- batch_info["status_message"].message_id,
80
- download_url,
81
- raw_filename
82
- )
83
- if download_error:
84
- raise ValueError(download_error)
85
-
86
- file_info = {
87
- 'cached': False,
88
- 'path': local_filepath,
89
- 'name': raw_filename,
90
- 'size': os.path.getsize(local_filepath),
91
- 'short_id': short_id,
92
- 'thumb': thumb_path
93
- }
94
-
95
- except Exception as e:
96
- error_msg = str(e)
97
-
98
- async with batch_info["lock"]:
99
- batch_info["processed_links"] += 1
100
- if file_info:
101
- batch_info["successful_downloads"].append(file_info)
102
- else:
103
- batch_info["failed_links"].append({"link": original_link, "error": error_msg})
104
-
105
- processed, total = batch_info['processed_links'], batch_info['total_links']
106
- try:
107
- await batch_info["status_message"].edit_text(
108
- f"βš™οΈ Batch `{batch_id[:6]}` in progress... Processed {processed}/{total} links."
109
- )
110
- except TelegramBadRequest:
111
- pass
112
-
113
- if processed == total:
114
- logger.info(f"Batch {batch_id[:6]} complete. Triggering final processing.")
115
- await handle_batch_completion(batch_id)
116
 
 
 
 
117
  TASK_QUEUE.task_done()
 
118
 
119
- async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message:
120
- file_path_or_id = file_info.get('file_id') or FSInputFile(file_info['path'], filename=file_info['name'])
121
- media_type = file_info.get('type')
122
- filename = file_info.get('name') or file_info.get('filename')
123
 
124
- video_exts = ('.mp4', '.mkv', '.mov', '.avi', '.webm')
125
- audio_exts = ('.mp3', '.flac', '.ogg', '.wav')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
126
 
127
- sent_message = None
128
- if media_type == 'video' or filename.lower().endswith(video_exts):
129
- if file_info.get('thumb') and os.path.exists(file_info['thumb']):
130
- sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
 
 
 
131
  else:
132
- sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
133
- media_type = 'video'
134
- elif media_type == 'audio' or filename.lower().endswith(audio_exts):
135
- sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption)
136
- media_type = 'audio'
137
- else:
138
- sent_message = await bot.send_document(chat_id, file_path_or_id, caption=caption)
139
- media_type = 'document'
140
-
141
- if not file_info.get('cached') and sent_message:
142
- file_id_to_cache = getattr(sent_message, media_type).file_id
143
- await db_utils.add_to_cache(
144
- file_info['short_id'], file_id_to_cache, filename, media_type, file_info['size']
145
- )
146
 
147
- return sent_message
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
 
149
- async def handle_batch_completion(batch_id: str):
150
- batch = BATCH_JOBS.get(batch_id)
151
- if not batch:
152
- return
153
 
154
- status_msg = batch["status_message"]
155
- successful_downloads = batch["successful_downloads"]
156
 
157
- try:
158
- if not successful_downloads:
159
- failed_links_text = "\n".join(
160
- [f"- {x['link']} β†’ {x['error']}" for x in batch['failed_links']]
161
- )
162
- await status_msg.edit_text(
163
- f"❌ Batch `{batch_id[:6]}` failed. No files could be processed.\nDetails:\n{failed_links_text}"
164
- )
165
- return
166
 
 
 
 
 
 
167
  await status_msg.edit_text(
168
- f"βœ… Batch `{batch_id[:6]}` downloaded. Preparing to send {len(successful_downloads)} files..."
169
  )
 
170
 
171
- # Forward original message if enabled
172
- if config.FORWARD_CHANNEL_ID:
173
- await bot.forward_message(
174
- config.FORWARD_CHANNEL_ID,
175
- batch["source_chat_id"],
176
- batch["source_message_id"]
177
- )
178
- for item in successful_downloads:
179
- caption = f"`{item.get('name') or item.get('filename')}`"
180
- await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption)
181
- await asyncio.sleep(1)
182
 
 
 
 
 
 
 
183
  for item in successful_downloads:
184
  caption = f"`{item.get('name') or item.get('filename')}`"
185
- await send_and_cache_file(batch["source_chat_id"], item, caption)
 
186
 
187
- summary = f"βœ… Batch `{batch_id[:6]}` complete: {len(successful_downloads)} files sent."
188
- if batch["failed_links"]:
189
- summary += f"\n❌ {len(batch['failed_links'])} links failed."
190
 
191
- await status_msg.edit_text(summary)
 
 
192
 
193
- except Exception as e:
194
- logger.error(f"Error during batch completion for {batch_id}: {e}", exc_info=True)
195
- await status_msg.edit_text(
196
- f"A critical error occurred while sending files for batch `{batch_id[:6]}`."
197
- )
198
- finally:
199
- for item in successful_downloads:
200
- if not item.get('cached') and os.path.exists(item['path']):
201
- os.remove(item['path'])
202
- if item.get('thumb') and os.path.exists(item['thumb']):
203
- os.remove(item['thumb'])
204
-
205
- del BATCH_JOBS[batch_id]
206
-
207
- # --- Handlers ---
208
- @dp.message(CommandStart())
209
- async def start_handler(message: Message):
210
- text = (
211
- "πŸ‘‹ <b>Welcome to the Terabox Downloader Bot!</b>\n\n"
212
- "πŸ“₯ Send me any valid Terabox link and I will fetch the file and send it to you.\n\n"
213
- f"πŸ“’ Please make sure you are a member of: @{config.FORCE_SUB_CHANNEL_USERNAME}\n\n"
214
- "πŸš€ Supports batch links & auto-caching.\n"
215
- "πŸ’Ύ Fast & lightweight.\n\n"
216
- "βœ… <i>Just send your link below ⬇️</i>"
217
  )
218
- await message.reply(text, link_preview_is_disabled=True)
 
 
 
 
 
219
 
220
- @dp.message(F.text | F.caption)
221
- async def message_handler(message: Message):
222
- if message.text and message.text.startswith('/'):
223
- return
224
 
225
- await db_utils.add_or_update_user_db(
226
- message.from_user.id,
227
- message.from_user.username,
228
- message.from_user.first_name
229
- )
230
 
231
- links = list(set(re.findall(r'https?://[^\s<>"\']+', message.text or message.caption or "")))
232
- terabox_links = [link for link in links if any(domain in link for domain in [
233
- "terabox.com", "teraboxapp.com", "terasharelink.com", "1024tera.com",
234
- "freeterabox.com", "4funbox.com", "box-links.com"
235
- ])]
236
 
237
- if not terabox_links:
238
- return
239
 
240
- batch_id = str(uuid.uuid4())
241
- status_msg = await message.reply(
242
- f"βœ… Found {len(terabox_links)} links. Queued as batch `{batch_id[:6]}`.",
243
- link_preview_is_disabled=True
244
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
245
 
246
- BATCH_JOBS[batch_id] = {
247
- "total_links": len(terabox_links),
248
- "processed_links": 0,
249
- "successful_downloads": [],
250
- "failed_links": [],
251
- "source_chat_id": message.chat.id,
252
- "source_user_id": message.from_user.id,
253
- "source_message_id": message.message_id,
254
- "status_message": status_msg,
255
- "lock": asyncio.Lock()
256
- }
257
-
258
- for link in terabox_links:
259
- await TASK_QUEUE.put({"batch_id": batch_id, "original_link": link})
260
-
261
- # --- Startup ---
262
- async def on_startup(dispatcher: Dispatcher):
263
- await db_utils.initialize_database()
264
- for i in range(config.CONCURRENT_WORKERS):
265
- asyncio.create_task(link_processor_worker(i + 1))
266
- logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.")
267
-
268
- def start_bot():
269
- dp.startup.register(on_startup)
270
- dp.message.register(message_handler)
271
- dp.message.register(start_handler, CommandStart())
272
- return dp, bot
 
1
+ import asyncio import logging import os import re import uuid from typing import Dict
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
+ from aiogram import Bot, Dispatcher, types, F from aiogram.filters import CommandStart from aiogram.types import Message, FSInputFile from aiogram.enums import ParseMode from aiogram.exceptions import TelegramBadRequest from aiogram.client.default import DefaultBotProperties
 
 
 
 
 
 
4
 
5
+ import config import terabox_utils as terabox import db_utils
 
 
6
 
7
+ --- Setup ---
 
 
 
 
 
 
8
 
9
+ os.makedirs("logs", exist_ok=True) os.makedirs("downloads", exist_ok=True)
 
 
10
 
11
+ logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler("logs/bot.log", mode='a', encoding='utf-8') ] ) logger = logging.getLogger("bot")
 
12
 
13
+ bot = Bot( token=config.BOT_TOKEN, default=DefaultBotProperties( parse_mode=ParseMode.HTML, link_preview_is_disabled=True ) )
14
+
15
+ dp = Dispatcher() TASK_QUEUE = asyncio.Queue() BATCH_JOBS: Dict[str, Dict] = {}
16
+
17
+ --- Worker Logic ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
 
19
+ async def link_processor_worker(worker_id: int): logger.info(f"Link worker #{worker_id} started.") while True: task = await TASK_QUEUE.get() batch_id, original_link = task["batch_id"], task["original_link"] batch_info = BATCH_JOBS.get(batch_id)
20
+
21
+ if not batch_info:
22
  TASK_QUEUE.task_done()
23
+ continue
24
 
25
+ file_info = None
26
+ error_msg = f"An unknown error occurred for link: {original_link}"
 
 
27
 
28
+ try:
29
+ short_id = await terabox.extract_terabox_short_id(original_link)
30
+ if not short_id:
31
+ raise ValueError("Invalid Terabox link format.")
32
+
33
+ cached_file = await db_utils.get_cached_file(short_id)
34
+ if cached_file:
35
+ logger.info(f"Cache hit for {short_id}. Reusing file_id.")
36
+ file_info = {'cached': True, 'short_id': short_id, **cached_file}
37
+ else:
38
+ download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link)
39
+ if error:
40
+ raise ValueError(error)
41
+
42
+ local_filepath, thumb_path, download_error = await terabox.download_terabox_file(
43
+ bot,
44
+ batch_info["source_chat_id"],
45
+ batch_info["status_message"].message_id,
46
+ download_url,
47
+ raw_filename
48
+ )
49
+ if download_error:
50
+ raise ValueError(download_error)
51
+
52
+ file_info = {
53
+ 'cached': False,
54
+ 'path': local_filepath,
55
+ 'name': raw_filename,
56
+ 'size': os.path.getsize(local_filepath),
57
+ 'short_id': short_id,
58
+ 'thumb': thumb_path
59
+ }
60
 
61
+ except Exception as e:
62
+ error_msg = str(e)
63
+
64
+ async with batch_info["lock"]:
65
+ batch_info["processed_links"] += 1
66
+ if file_info:
67
+ batch_info["successful_downloads"].append(file_info)
68
  else:
69
+ batch_info["failed_links"].append({"link": original_link, "error": error_msg})
 
 
 
 
 
 
 
 
 
 
 
 
 
70
 
71
+ processed, total = batch_info['processed_links'], batch_info['total_links']
72
+ try:
73
+ await batch_info["status_message"].edit_text(
74
+ f"βš™οΈ Batch `{batch_id[:6]}` in progress... Processed {processed}/{total} links."
75
+ )
76
+ except TelegramBadRequest:
77
+ pass
78
+
79
+ if processed == total:
80
+ logger.info(f"Batch {batch_id[:6]} complete. Triggering final processing.")
81
+ await handle_batch_completion(batch_id)
82
+
83
+ TASK_QUEUE.task_done()
84
+
85
+ async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message: file_path_or_id = file_info.get('file_id') or FSInputFile(file_info['path'], filename=file_info['name']) media_type = file_info.get('type') filename = file_info.get('name') or file_info.get('filename')
86
+
87
+ video_exts = ('.mp4', '.mkv', '.mov', '.avi', '.webm')
88
+ audio_exts = ('.mp3', '.flac', '.ogg', '.wav')
89
+
90
+ sent_message = None
91
+ if media_type == 'video' or filename.lower().endswith(video_exts):
92
+ sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
93
+ media_type = 'video'
94
+ elif media_type == 'audio' or filename.lower().endswith(audio_exts):
95
+ sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption)
96
+ media_type = 'audio'
97
+ else:
98
+ sent_message = await bot.send_document(chat_id, file_path_or_id, caption=caption)
99
+ media_type = 'document'
100
+
101
+ if not file_info.get('cached') and sent_message:
102
+ file_id_to_cache = getattr(sent_message, media_type).file_id
103
+ await db_utils.add_to_cache(
104
+ file_info['short_id'], file_id_to_cache, filename, media_type, file_info['size']
105
+ )
106
 
107
+ return sent_message
 
 
 
108
 
109
+ async def handle_batch_completion(batch_id: str): batch = BATCH_JOBS.get(batch_id) if not batch: return
 
110
 
111
+ status_msg = batch["status_message"]
112
+ successful_downloads = batch["successful_downloads"]
 
 
 
 
 
 
 
113
 
114
+ try:
115
+ if not successful_downloads:
116
+ failed_links_text = "\n".join(
117
+ [f"- {x['link']} β†’ {x['error']}" for x in batch['failed_links']]
118
+ )
119
  await status_msg.edit_text(
120
+ f"❌ Batch `{batch_id[:6]}` failed. No files could be processed.\nDetails:\n{failed_links_text}"
121
  )
122
+ return
123
 
124
+ await status_msg.edit_text(
125
+ f"βœ… Batch `{batch_id[:6]}` downloaded. Preparing to send {len(successful_downloads)} files..."
126
+ )
 
 
 
 
 
 
 
 
127
 
128
+ if config.FORWARD_CHANNEL_ID:
129
+ await bot.forward_message(
130
+ config.FORWARD_CHANNEL_ID,
131
+ batch["source_chat_id"],
132
+ batch["source_message_id"]
133
+ )
134
  for item in successful_downloads:
135
  caption = f"`{item.get('name') or item.get('filename')}`"
136
+ await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption)
137
+ await asyncio.sleep(1)
138
 
139
+ for item in successful_downloads:
140
+ caption = f"`{item.get('name') or item.get('filename')}`"
141
+ await send_and_cache_file(batch["source_chat_id"], item, caption)
142
 
143
+ summary = f"βœ… Batch `{batch_id[:6]}` complete: {len(successful_downloads)} files sent."
144
+ if batch["failed_links"]:
145
+ summary += f"\n❌ {len(batch['failed_links'])} links failed."
146
 
147
+ await status_msg.edit_text(summary)
148
+
149
+ except Exception as e:
150
+ logger.error(f"Error during batch completion for {batch_id}: {e}", exc_info=True)
151
+ await status_msg.edit_text(
152
+ f"A critical error occurred while sending files for batch `{batch_id[:6]}`."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
  )
154
+ finally:
155
+ for item in successful_downloads:
156
+ if not item.get('cached') and os.path.exists(item['path']):
157
+ os.remove(item['path'])
158
+ if item.get('thumb') and os.path.exists(item['thumb']):
159
+ os.remove(item['thumb'])
160
 
161
+ del BATCH_JOBS[batch_id]
 
 
 
162
 
163
+ --- Handlers ---
 
 
 
 
164
 
165
+ @dp.message(CommandStart()) async def start_handler(message: Message): text = ( "πŸ‘‹ <b>Welcome to the Terabox Downloader Bot!</b>\n\n" "πŸ“₯ Send me any valid Terabox link and I will fetch the file and send it to you.\n\n" f"πŸ“’ Please make sure you are a member of: @{config.FORCE_SUB_CHANNEL_USERNAME}\n\n" "πŸš€ Supports batch links & auto-caching.\n" "πŸ’Ύ Fast & lightweight.\n\n" "βœ… <i>Just send your link below ⬇️</i>" ) await bot.send_message( chat_id=message.chat.id, text=text, parse_mode=ParseMode.HTML, disable_web_page_preview=True )
 
 
 
 
166
 
167
+ @dp.message(F.text | F.caption) async def message_handler(message: Message): if message.text and message.text.startswith('/'): return
 
168
 
169
+ await db_utils.add_or_update_user_db(
170
+ message.from_user.id,
171
+ message.from_user.username,
172
+ message.from_user.first_name
173
+ )
174
+
175
+ links = list(set(re.findall(r'https?://[^\s<>\"\']+', message.text or message.caption or "")))
176
+ terabox_links = [link for link in links if any(domain in link for domain in [
177
+ "terabox.com", "teraboxapp.com", "terasharelink.com", "1024tera.com",
178
+ "freeterabox.com", "4funbox.com", "box-links.com"
179
+ ])]
180
+
181
+ if not terabox_links:
182
+ return
183
+
184
+ batch_id = str(uuid.uuid4())
185
+ status_msg = await bot.send_message(
186
+ chat_id=message.chat.id,
187
+ text=f"βœ… Found {len(terabox_links)} links. Queued as batch `{batch_id[:6]}`.",
188
+ parse_mode=ParseMode.HTML,
189
+ disable_web_page_preview=True
190
+ )
191
+
192
+ BATCH_JOBS[batch_id] = {
193
+ "total_links": len(terabox_links),
194
+ "processed_links": 0,
195
+ "successful_downloads": [],
196
+ "failed_links": [],
197
+ "source_chat_id": message.chat.id,
198
+ "source_user_id": message.from_user.id,
199
+ "source_message_id": message.message_id,
200
+ "status_message": status_msg,
201
+ "lock": asyncio.Lock()
202
+ }
203
+
204
+ for link in terabox_links:
205
+ await TASK_QUEUE.put({"batch_id": batch_id, "original_link": link})
206
+
207
+ --- Startup ---
208
+
209
+ async def on_startup(dispatcher: Dispatcher): await db_utils.initialize_database() for i in range(config.CONCURRENT_WORKERS): asyncio.create_task(link_processor_worker(i + 1)) logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.")
210
+
211
+ def start_bot(): dp.startup.register(on_startup) dp.message.register(message_handler) dp.message.register(start_handler, CommandStart()) return dp, bot
212