| import asyncio | |
| import os | |
| import time | |
| import tempfile | |
| from typing import Union | |
| from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Voice | |
| import config | |
| from DragMusic import app | |
| from DragMusic.utils.formatters import ( | |
| check_duration, | |
| convert_bytes, | |
| get_readable_time, | |
| seconds_to_min, | |
| ) | |
| class TeleAPI: | |
| def __init__(self): | |
| self.chars_limit = 4096 | |
| self.sleep = 5 | |
| async def send_split_text(self, message, string): | |
| n = self.chars_limit | |
| out = [(string[i : i + n]) for i in range(0, len(string), n)] | |
| j = 0 | |
| for x in out: | |
| if j <= 2: | |
| j += 1 | |
| await message.reply_text(x, disable_web_page_preview=True) | |
| return True | |
| async def get_link(self, message): | |
| return message.link | |
| async def get_filename(self, file, audio: Union[bool, str] = None): | |
| try: | |
| file_name = file.file_name | |
| if file_name is None: | |
| file_name = "ᴛᴇʟᴇɢʀᴀᴍ ᴀᴜᴅɪᴏ" if audio else "ᴛᴇʟᴇɢʀᴀᴍ ᴠɪᴅᴇᴏ" | |
| except: | |
| file_name = "ᴛᴇʟᴇɢʀᴀᴍ ᴀᴜᴅɪᴏ" if audio else "ᴛᴇʟᴇɢʀᴀᴍ ᴠɪᴅᴇᴏ" | |
| return file_name | |
| async def get_duration(self, file): | |
| try: | |
| dur = seconds_to_min(file.duration) | |
| except: | |
| dur = "Unknown" | |
| return dur | |
| async def get_duration(self, filex, file_path): | |
| try: | |
| dur = seconds_to_min(filex.duration) | |
| except: | |
| try: | |
| dur = await asyncio.get_event_loop().run_in_executor( | |
| None, check_duration, file_path | |
| ) | |
| dur = seconds_to_min(dur) | |
| except: | |
| return "Unknown" | |
| return dur | |
| async def get_filepath( | |
| self, | |
| audio: Union[bool, str] = None, | |
| video: Union[bool, str] = None, | |
| ): | |
| if audio: | |
| try: | |
| file_name = ( | |
| audio.file_unique_id | |
| + "." | |
| + ( | |
| (audio.file_name.split(".")[-1]) | |
| if (not isinstance(audio, Voice)) | |
| else "ogg" | |
| ) | |
| ) | |
| except: | |
| file_name = audio.file_unique_id + "." + "ogg" | |
| file_name = os.path.join(tempfile.gettempdir(), file_name) | |
| if video: | |
| try: | |
| file_name = ( | |
| video.file_unique_id + "." + (video.file_name.split(".")[-1]) | |
| ) | |
| except: | |
| file_name = video.file_unique_id + "." + "mp4" | |
| file_name = os.path.join(tempfile.gettempdir(), file_name) | |
| return file_name | |
| async def download(self, _, message, mystic, fname): | |
| lower = [0, 8, 17, 38, 64, 77, 96] | |
| higher = [5, 10, 20, 40, 66, 80, 99] | |
| checker = [5, 10, 20, 40, 66, 80, 99] | |
| speed_counter = {} | |
| if os.path.exists(fname): | |
| return True | |
| async def down_load(): | |
| async def progress(current, total): | |
| if current == total: | |
| return | |
| current_time = time.time() | |
| start_time = speed_counter.get(message.id) | |
| check_time = current_time - start_time | |
| upl = InlineKeyboardMarkup( | |
| [ | |
| [ | |
| InlineKeyboardButton( | |
| text="ᴄᴀɴᴄᴇʟ", | |
| callback_data="stop_downloading", | |
| ), | |
| ] | |
| ] | |
| ) | |
| percentage = current * 100 / total | |
| percentage = str(round(percentage, 2)) | |
| speed = current / check_time | |
| eta = int((total - current) / speed) | |
| eta = get_readable_time(eta) | |
| if not eta: | |
| eta = "0 sᴇᴄᴏɴᴅs" | |
| total_size = convert_bytes(total) | |
| completed_size = convert_bytes(current) | |
| speed = convert_bytes(speed) | |
| percentage = int((percentage.split("."))[0]) | |
| for counter in range(7): | |
| low = int(lower[counter]) | |
| high = int(higher[counter]) | |
| check = int(checker[counter]) | |
| if low < percentage <= high: | |
| if high == check: | |
| try: | |
| await mystic.edit_text( | |
| text=_["tg_1"].format( | |
| app.mention, | |
| total_size, | |
| completed_size, | |
| percentage[:5], | |
| speed, | |
| eta, | |
| ), | |
| reply_markup=upl, | |
| ) | |
| checker[counter] = 100 | |
| except: | |
| pass | |
| speed_counter[message.id] = time.time() | |
| try: | |
| await app.download_media( | |
| message.reply_to_message, | |
| file_name=fname, | |
| progress=progress, | |
| ) | |
| try: | |
| elapsed = get_readable_time( | |
| int(int(time.time()) - int(speed_counter[message.id])) | |
| ) | |
| except: | |
| elapsed = "0 sᴇᴄᴏɴᴅs" | |
| await mystic.edit_text(_["tg_2"].format(elapsed)) | |
| except: | |
| await mystic.edit_text(_["tg_3"]) | |
| task = asyncio.create_task(down_load()) | |
| config.lyrical[mystic.id] = task | |
| await task | |
| verify = config.lyrical.get(mystic.id) | |
| if not verify: | |
| return False | |
| config.lyrical.pop(mystic.id) | |
| return True | |