understanding commited on
Commit
44de040
Β·
verified Β·
1 Parent(s): bc7bf12

Update bot.py

Browse files
Files changed (1) hide show
  1. bot.py +241 -180
bot.py CHANGED
@@ -1,212 +1,273 @@
1
- import asyncio import logging import os import re import uuid from typing import Dict
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
- from aiogram import Bot, Dispatcher, types, F from aiogram.filters import CommandStart from aiogram.types import Message, FSInputFile from aiogram.enums import ParseMode from aiogram.exceptions import TelegramBadRequest from aiogram.client.default import DefaultBotProperties
 
 
 
 
 
 
4
 
5
- import config import terabox_utils as terabox import db_utils
 
 
6
 
7
- --- Setup ---
 
 
 
 
 
 
8
 
9
- os.makedirs("logs", exist_ok=True) os.makedirs("downloads", exist_ok=True)
 
 
10
 
11
- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler("logs/bot.log", mode='a', encoding='utf-8') ] ) logger = logging.getLogger("bot")
 
12
 
13
- bot = Bot( token=config.BOT_TOKEN, default=DefaultBotProperties( parse_mode=ParseMode.HTML, link_preview_is_disabled=True ) )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
- dp = Dispatcher() TASK_QUEUE = asyncio.Queue() BATCH_JOBS: Dict[str, Dict] = {}
16
 
17
- --- Worker Logic ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
 
19
- async def link_processor_worker(worker_id: int): logger.info(f"Link worker #{worker_id} started.") while True: task = await TASK_QUEUE.get() batch_id, original_link = task["batch_id"], task["original_link"] batch_info = BATCH_JOBS.get(batch_id)
20
 
21
- if not batch_info:
22
- TASK_QUEUE.task_done()
23
- continue
 
24
 
25
- file_info = None
26
- error_msg = f"An unknown error occurred for link: {original_link}"
27
 
28
  try:
29
- short_id = await terabox.extract_terabox_short_id(original_link)
30
- if not short_id:
31
- raise ValueError("Invalid Terabox link format.")
32
-
33
- cached_file = await db_utils.get_cached_file(short_id)
34
- if cached_file:
35
- logger.info(f"Cache hit for {short_id}. Reusing file_id.")
36
- file_info = {'cached': True, 'short_id': short_id, **cached_file}
37
- else:
38
- download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link)
39
- if error:
40
- raise ValueError(error)
41
-
42
- local_filepath, thumb_path, download_error = await terabox.download_terabox_file(
43
- bot,
44
- batch_info["source_chat_id"],
45
- batch_info["status_message"].message_id,
46
- download_url,
47
- raw_filename
48
  )
49
- if download_error:
50
- raise ValueError(download_error)
51
-
52
- file_info = {
53
- 'cached': False,
54
- 'path': local_filepath,
55
- 'name': raw_filename,
56
- 'size': os.path.getsize(local_filepath),
57
- 'short_id': short_id,
58
- 'thumb': thumb_path
59
- }
60
-
61
- except Exception as e:
62
- error_msg = str(e)
63
-
64
- async with batch_info["lock"]:
65
- batch_info["processed_links"] += 1
66
- if file_info:
67
- batch_info["successful_downloads"].append(file_info)
68
- else:
69
- batch_info["failed_links"].append({"link": original_link, "error": error_msg})
70
-
71
- processed, total = batch_info['processed_links'], batch_info['total_links']
72
- try:
73
- await batch_info["status_message"].edit_text(
74
- f"βš™οΈ Batch `{batch_id[:6]}` in progress... Processed {processed}/{total} links."
75
  )
76
- except TelegramBadRequest:
77
- pass
78
-
79
- if processed == total:
80
- logger.info(f"Batch {batch_id[:6]} complete. Triggering final processing.")
81
- await handle_batch_completion(batch_id)
82
-
83
- TASK_QUEUE.task_done()
84
-
85
- async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message: file_path_or_id = file_info.get('file_id') or FSInputFile(file_info['path'], filename=file_info['name']) media_type = file_info.get('type') filename = file_info.get('name') or file_info.get('filename')
86
-
87
- video_exts = ('.mp4', '.mkv', '.mov', '.avi', '.webm')
88
- audio_exts = ('.mp3', '.flac', '.ogg', '.wav')
89
-
90
- sent_message = None
91
- if media_type == 'video' or filename.lower().endswith(video_exts):
92
- sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
93
- media_type = 'video'
94
- elif media_type == 'audio' or filename.lower().endswith(audio_exts):
95
- sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption)
96
- media_type = 'audio'
97
- else:
98
- sent_message = await bot.send_document(chat_id, file_path_or_id, caption=caption)
99
- media_type = 'document'
100
-
101
- if not file_info.get('cached') and sent_message:
102
- file_id_to_cache = getattr(sent_message, media_type).file_id
103
- await db_utils.add_to_cache(
104
- file_info['short_id'], file_id_to_cache, filename, media_type, file_info['size']
105
- )
106
-
107
- return sent_message
108
 
109
- async def handle_batch_completion(batch_id: str): batch = BATCH_JOBS.get(batch_id) if not batch: return
110
-
111
- status_msg = batch["status_message"]
112
- successful_downloads = batch["successful_downloads"]
113
-
114
- try:
115
- if not successful_downloads:
116
- failed_links_text = "\n".join(
117
- [f"- {x['link']} β†’ {x['error']}" for x in batch['failed_links']]
118
- )
119
  await status_msg.edit_text(
120
- f"❌ Batch `{batch_id[:6]}` failed. No files could be processed.\nDetails:\n{failed_links_text}"
121
  )
122
- return
123
 
124
- await status_msg.edit_text(
125
- f"βœ… Batch `{batch_id[:6]}` downloaded. Preparing to send {len(successful_downloads)} files..."
126
- )
 
 
 
 
 
 
 
127
 
128
- if config.FORWARD_CHANNEL_ID:
129
- await bot.forward_message(
130
- config.FORWARD_CHANNEL_ID,
131
- batch["source_chat_id"],
132
- batch["source_message_id"]
133
- )
134
  for item in successful_downloads:
135
  caption = f"`{item.get('name') or item.get('filename')}`"
136
- await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption)
137
- await asyncio.sleep(1)
138
-
139
- for item in successful_downloads:
140
- caption = f"`{item.get('name') or item.get('filename')}`"
141
- await send_and_cache_file(batch["source_chat_id"], item, caption)
142
 
143
- summary = f"βœ… Batch `{batch_id[:6]}` complete: {len(successful_downloads)} files sent."
144
- if batch["failed_links"]:
145
- summary += f"\n❌ {len(batch['failed_links'])} links failed."
146
 
147
- await status_msg.edit_text(summary)
148
 
149
- except Exception as e:
150
- logger.error(f"Error during batch completion for {batch_id}: {e}", exc_info=True)
151
- await status_msg.edit_text(
152
- f"A critical error occurred while sending files for batch `{batch_id[:6]}`."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
  )
154
- finally:
155
- for item in successful_downloads:
156
- if not item.get('cached') and os.path.exists(item['path']):
157
- os.remove(item['path'])
158
- if item.get('thumb') and os.path.exists(item['thumb']):
159
- os.remove(item['thumb'])
160
-
161
- del BATCH_JOBS[batch_id]
162
-
163
- --- Handlers ---
164
-
165
- @dp.message(CommandStart()) async def start_handler(message: Message): text = ( "πŸ‘‹ <b>Welcome to the Terabox Downloader Bot!</b>\n\n" "πŸ“₯ Send me any valid Terabox link and I will fetch the file and send it to you.\n\n" f"πŸ“’ Please make sure you are a member of: @{config.FORCE_SUB_CHANNEL_USERNAME}\n\n" "πŸš€ Supports batch links & auto-caching.\n" "πŸ’Ύ Fast & lightweight.\n\n" "βœ… <i>Just send your link below ⬇️</i>" ) await bot.send_message( chat_id=message.chat.id, text=text, parse_mode=ParseMode.HTML, disable_web_page_preview=True )
166
-
167
- @dp.message(F.text | F.caption) async def message_handler(message: Message): if message.text and message.text.startswith('/'): return
168
-
169
- await db_utils.add_or_update_user_db(
170
- message.from_user.id,
171
- message.from_user.username,
172
- message.from_user.first_name
173
- )
174
-
175
- links = list(set(re.findall(r'https?://[^\s<>\"\']+', message.text or message.caption or "")))
176
- terabox_links = [link for link in links if any(domain in link for domain in [
177
- "terabox.com", "teraboxapp.com", "terasharelink.com", "1024tera.com",
178
- "freeterabox.com", "4funbox.com", "box-links.com"
179
- ])]
180
-
181
- if not terabox_links:
182
- return
183
-
184
- batch_id = str(uuid.uuid4())
185
- status_msg = await bot.send_message(
186
- chat_id=message.chat.id,
187
- text=f"βœ… Found {len(terabox_links)} links. Queued as batch `{batch_id[:6]}`.",
188
- parse_mode=ParseMode.HTML,
189
- disable_web_page_preview=True
190
- )
191
 
192
- BATCH_JOBS[batch_id] = {
193
- "total_links": len(terabox_links),
194
- "processed_links": 0,
195
- "successful_downloads": [],
196
- "failed_links": [],
197
- "source_chat_id": message.chat.id,
198
- "source_user_id": message.from_user.id,
199
- "source_message_id": message.message_id,
200
- "status_message": status_msg,
201
- "lock": asyncio.Lock()
202
- }
203
 
204
- for link in terabox_links:
205
- await TASK_QUEUE.put({"batch_id": batch_id, "original_link": link})
 
 
 
206
 
207
- --- Startup ---
 
 
 
 
208
 
209
- async def on_startup(dispatcher: Dispatcher): await db_utils.initialize_database() for i in range(config.CONCURRENT_WORKERS): asyncio.create_task(link_processor_worker(i + 1)) logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.")
 
210
 
211
- def start_bot(): dp.startup.register(on_startup) dp.message.register(message_handler) dp.message.register(start_handler, CommandStart()) return dp, bot
 
 
 
 
 
 
212
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import logging
3
+ import os
4
+ import re
5
+ import uuid
6
+ from typing import Dict
7
+
8
+ from aiogram import Bot, Dispatcher, types, F
9
+ from aiogram.filters import CommandStart
10
+ from aiogram.types import Message, FSInputFile
11
+ from aiogram.enums import ParseMode
12
+ from aiogram.exceptions import TelegramBadRequest
13
+ from aiogram.client.default import DefaultBotProperties
14
+
15
+ import config
16
+ import terabox_utils as terabox
17
+ import db_utils
18
+
19
+ # --- Setup ---
20
+ os.makedirs("logs", exist_ok=True)
21
+ os.makedirs("downloads", exist_ok=True)
22
+
23
+ logging.basicConfig(
24
+ level=logging.INFO,
25
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
26
+ handlers=[
27
+ logging.StreamHandler(),
28
+ logging.FileHandler("logs/bot.log", mode='a', encoding='utf-8')
29
+ ]
30
+ )
31
+ logger = logging.getLogger("bot")
32
 
33
+ bot = Bot(
34
+ token=config.BOT_TOKEN,
35
+ default=DefaultBotProperties(
36
+ parse_mode=ParseMode.HTML,
37
+ link_preview_is_disabled=True
38
+ )
39
+ )
40
 
41
+ dp = Dispatcher()
42
+ TASK_QUEUE = asyncio.Queue()
43
+ BATCH_JOBS: Dict[str, Dict] = {}
44
 
45
+ # --- Worker Logic ---
46
+ async def link_processor_worker(worker_id: int):
47
+ logger.info(f"Link worker #{worker_id} started.")
48
+ while True:
49
+ task = await TASK_QUEUE.get()
50
+ batch_id, original_link = task["batch_id"], task["original_link"]
51
+ batch_info = BATCH_JOBS.get(batch_id)
52
 
53
+ if not batch_info:
54
+ TASK_QUEUE.task_done()
55
+ continue
56
 
57
+ file_info = None
58
+ error_msg = f"An unknown error occurred for link: {original_link}"
59
 
60
+ try:
61
+ short_id = await terabox.extract_terabox_short_id(original_link)
62
+ if not short_id:
63
+ raise ValueError("Invalid Terabox link format.")
64
+
65
+ cached_file = await db_utils.get_cached_file(short_id)
66
+ if cached_file:
67
+ logger.info(f"Cache hit for {short_id}. Reusing file_id.")
68
+ file_info = {'cached': True, 'short_id': short_id, **cached_file}
69
+ else:
70
+ download_url, raw_filename, error = await terabox.get_final_url_and_filename(original_link)
71
+ if error:
72
+ raise ValueError(error)
73
+
74
+ local_filepath, thumb_path, download_error = await terabox.download_terabox_file(
75
+ bot,
76
+ batch_info["source_chat_id"],
77
+ batch_info["status_message"].message_id,
78
+ download_url,
79
+ raw_filename
80
+ )
81
+ if download_error:
82
+ raise ValueError(download_error)
83
+
84
+ file_info = {
85
+ 'cached': False,
86
+ 'path': local_filepath,
87
+ 'name': raw_filename,
88
+ 'size': os.path.getsize(local_filepath),
89
+ 'short_id': short_id,
90
+ 'thumb': thumb_path
91
+ }
92
+
93
+ except Exception as e:
94
+ error_msg = str(e)
95
+
96
+ async with batch_info["lock"]:
97
+ batch_info["processed_links"] += 1
98
+ if file_info:
99
+ batch_info["successful_downloads"].append(file_info)
100
+ else:
101
+ batch_info["failed_links"].append({"link": original_link, "error": error_msg})
102
+
103
+ processed, total = batch_info['processed_links'], batch_info['total_links']
104
+ try:
105
+ await batch_info["status_message"].edit_text(
106
+ f"βš™οΈ Batch `{batch_id[:6]}` in progress... Processed {processed}/{total} links."
107
+ )
108
+ except TelegramBadRequest:
109
+ pass
110
+
111
+ if processed == total:
112
+ logger.info(f"Batch {batch_id[:6]} complete. Triggering final processing.")
113
+ await handle_batch_completion(batch_id)
114
 
115
+ TASK_QUEUE.task_done()
116
 
117
+ async def send_and_cache_file(chat_id: int, file_info: dict, caption: str) -> Message:
118
+ file_path_or_id = file_info.get('file_id') or FSInputFile(file_info['path'], filename=file_info['name'])
119
+ media_type = file_info.get('type')
120
+ filename = file_info.get('name') or file_info.get('filename')
121
+
122
+ video_exts = ('.mp4', '.mkv', '.mov', '.avi', '.webm')
123
+ audio_exts = ('.mp3', '.flac', '.ogg', '.wav')
124
+
125
+ sent_message = None
126
+ if media_type == 'video' or filename.lower().endswith(video_exts):
127
+ sent_message = await bot.send_video(chat_id, file_path_or_id, caption=caption, supports_streaming=True)
128
+ media_type = 'video'
129
+ elif media_type == 'audio' or filename.lower().endswith(audio_exts):
130
+ sent_message = await bot.send_audio(chat_id, file_path_or_id, caption=caption)
131
+ media_type = 'audio'
132
+ else:
133
+ sent_message = await bot.send_document(chat_id, file_path_or_id, caption=caption)
134
+ media_type = 'document'
135
+
136
+ if not file_info.get('cached') and sent_message:
137
+ file_id_to_cache = getattr(sent_message, media_type).file_id
138
+ await db_utils.add_to_cache(
139
+ file_info['short_id'], file_id_to_cache, filename, media_type, file_info['size']
140
+ )
141
 
142
+ return sent_message
143
 
144
+ async def handle_batch_completion(batch_id: str):
145
+ batch = BATCH_JOBS.get(batch_id)
146
+ if not batch:
147
+ return
148
 
149
+ status_msg = batch["status_message"]
150
+ successful_downloads = batch["successful_downloads"]
151
 
152
  try:
153
+ if not successful_downloads:
154
+ failed_links_text = "\n".join(
155
+ [f"- {x['link']} β†’ {x['error']}" for x in batch['failed_links']]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
  )
157
+ await status_msg.edit_text(
158
+ f"❌ Batch `{batch_id[:6]}` failed. No files could be processed.\nDetails:\n{failed_links_text}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  )
160
+ return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
 
 
 
 
 
 
 
 
 
 
 
162
  await status_msg.edit_text(
163
+ f"βœ… Batch `{batch_id[:6]}` downloaded. Preparing to send {len(successful_downloads)} files..."
164
  )
 
165
 
166
+ if config.FORWARD_CHANNEL_ID:
167
+ await bot.forward_message(
168
+ config.FORWARD_CHANNEL_ID,
169
+ batch["source_chat_id"],
170
+ batch["source_message_id"]
171
+ )
172
+ for item in successful_downloads:
173
+ caption = f"`{item.get('name') or item.get('filename')}`"
174
+ await send_and_cache_file(config.FORWARD_CHANNEL_ID, item, caption)
175
+ await asyncio.sleep(1)
176
 
 
 
 
 
 
 
177
  for item in successful_downloads:
178
  caption = f"`{item.get('name') or item.get('filename')}`"
179
+ await send_and_cache_file(batch["source_chat_id"], item, caption)
 
 
 
 
 
180
 
181
+ summary = f"βœ… Batch `{batch_id[:6]}` complete: {len(successful_downloads)} files sent."
182
+ if batch["failed_links"]:
183
+ summary += f"\n❌ {len(batch['failed_links'])} links failed."
184
 
185
+ await status_msg.edit_text(summary)
186
 
187
+ except Exception as e:
188
+ logger.error(f"Error during batch completion for {batch_id}: {e}", exc_info=True)
189
+ await status_msg.edit_text(
190
+ f"A critical error occurred while sending files for batch `{batch_id[:6]}`."
191
+ )
192
+ finally:
193
+ for item in successful_downloads:
194
+ if not item.get('cached') and os.path.exists(item['path']):
195
+ os.remove(item['path'])
196
+ if item.get('thumb') and os.path.exists(item['thumb']):
197
+ os.remove(item['thumb'])
198
+
199
+ del BATCH_JOBS[batch_id]
200
+
201
+ # --- Handlers ---
202
+ @dp.message(CommandStart())
203
+ async def start_handler(message: Message):
204
+ text = (
205
+ "πŸ‘‹ <b>Welcome to the Terabox Downloader Bot!</b>\n\n"
206
+ "πŸ“₯ Send me any valid Terabox link and I will fetch the file and send it to you.\n\n"
207
+ f"πŸ“’ Please make sure you are a member of: @{config.FORCE_SUB_CHANNEL_USERNAME}\n\n"
208
+ "πŸš€ Supports batch links & auto-caching.\n"
209
+ "πŸ’Ύ Fast & lightweight.\n\n"
210
+ "βœ… <i>Just send your link below ⬇️</i>"
211
+ )
212
+ await bot.send_message(
213
+ chat_id=message.chat.id,
214
+ text=text,
215
+ parse_mode=ParseMode.HTML,
216
+ disable_web_page_preview=True
217
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218
 
219
+ @dp.message(F.text | F.caption)
220
+ async def message_handler(message: Message):
221
+ if message.text and message.text.startswith('/'):
222
+ return
 
 
 
 
 
 
 
223
 
224
+ await db_utils.add_or_update_user_db(
225
+ message.from_user.id,
226
+ message.from_user.username,
227
+ message.from_user.first_name
228
+ )
229
 
230
+ links = list(set(re.findall(r'https?://[^\s<>\"\']+', message.text or message.caption or "")))
231
+ terabox_links = [link for link in links if any(domain in link for domain in [
232
+ "terabox.com", "teraboxapp.com", "terasharelink.com", "1024tera.com",
233
+ "freeterabox.com", "4funbox.com", "box-links.com"
234
+ ])]
235
 
236
+ if not terabox_links:
237
+ return
238
 
239
+ batch_id = str(uuid.uuid4())
240
+ status_msg = await bot.send_message(
241
+ chat_id=message.chat.id,
242
+ text=f"βœ… Found {len(terabox_links)} links. Queued as batch `{batch_id[:6]}`.",
243
+ parse_mode=ParseMode.HTML,
244
+ disable_web_page_preview=True
245
+ )
246
 
247
+ BATCH_JOBS[batch_id] = {
248
+ "total_links": len(terabox_links),
249
+ "processed_links": 0,
250
+ "successful_downloads": [],
251
+ "failed_links": [],
252
+ "source_chat_id": message.chat.id,
253
+ "source_user_id": message.from_user.id,
254
+ "source_message_id": message.message_id,
255
+ "status_message": status_msg,
256
+ "lock": asyncio.Lock()
257
+ }
258
+
259
+ for link in terabox_links:
260
+ await TASK_QUEUE.put({"batch_id": batch_id, "original_link": link})
261
+
262
+ # --- Startup ---
263
+ async def on_startup(dispatcher: Dispatcher):
264
+ await db_utils.initialize_database()
265
+ for i in range(config.CONCURRENT_WORKERS):
266
+ asyncio.create_task(link_processor_worker(i + 1))
267
+ logger.info(f"Bot started with {config.CONCURRENT_WORKERS} concurrent workers.")
268
+
269
+ def start_bot():
270
+ dp.startup.register(on_startup)
271
+ dp.message.register(message_handler)
272
+ dp.message.register(start_handler, CommandStart())
273
+ return dp, bot