Spaces:
Paused
Paused
| import os | |
| import requests | |
| from random import randint | |
| from DragMusic.utils.database import ( | |
| add_served_chat, | |
| add_served_user, | |
| blacklisted_chats, | |
| get_lang, | |
| is_banned_user, | |
| is_on_off, | |
| ) | |
| from pykeyboard import InlineKeyboard | |
| from pyrogram import filters | |
| from pyrogram.types import ( | |
| InlineKeyboardButton, | |
| CallbackQuery, | |
| InlineKeyboardMarkup, | |
| Message, | |
| ) | |
| from DragMusic.utils import close_markup | |
| from config import BANNED_USERS, SERVER_PLAYLIST_LIMIT | |
| from DragMusic import Carbon, app | |
| from DragMusic.utils.decorators.language import language, languageCB | |
| from DragMusic.utils.inline.playlist import ( | |
| botplaylist_markup, | |
| get_playlist_markup, | |
| warning_markup, | |
| ) | |
| from DragMusic.utils.pastebin import DragBin | |
| import time | |
| import asyncio | |
| import yt_dlp | |
| from youtube_search import YoutubeSearch | |
| from youtubesearchpython import VideosSearch | |
| from youtubesearchpython import SearchVideos | |
| from DragMusic.utils.stream.stream import stream | |
| from typing import Dict, List, Union | |
| from time import time | |
| import asyncio | |
| from DragMusic.utils.extraction import extract_user | |
| #youtube api key | |
| YOUTUBE_API_KEY = 'AIzaSyAisAILkwpcmK7TC79R6UhQ3isSqUnHvhY' # Regenerate your key and replace this | |
| # Define a dictionary to track the last message timestamp for each user | |
| user_last_message_time = {} | |
| user_command_count = {} | |
| # Define the threshold for command spamming (e.g., 20 commands within 60 seconds) | |
| SPAM_THRESHOLD = 2 | |
| SPAM_WINDOW_SECONDS = 5 | |
| from DragMusic.core.mongo import mongodb | |
| playlistdb = mongodb.playlist | |
| playlist = [] | |
| # Playlist Databse | |
| async def _get_playlists(chat_id: int) -> Dict[str, int]: | |
| _notes = await playlistdb.find_one({"chat_id": chat_id}) | |
| if not _notes: | |
| return {} | |
| return _notes["notes"] | |
| async def get_playlist_names(chat_id: int) -> List[str]: | |
| _notes = [] | |
| for note in await _get_playlists(chat_id): | |
| _notes.append(note) | |
| return _notes | |
| async def get_playlist(chat_id: int, name: str) -> Union[bool, dict]: | |
| name = name | |
| _notes = await _get_playlists(chat_id) | |
| if name in _notes: | |
| return _notes[name] | |
| else: | |
| return False | |
| async def save_playlist(chat_id: int, name: str, note: dict): | |
| name = name | |
| _notes = await _get_playlists(chat_id) | |
| _notes[name] = note | |
| await playlistdb.update_one( | |
| {"chat_id": chat_id}, {"$set": {"notes": _notes}}, upsert=True | |
| ) | |
| async def delete_playlist(chat_id: int, name: str) -> bool: | |
| notesd = await _get_playlists(chat_id) | |
| name = name | |
| if name in notesd: | |
| del notesd[name] | |
| await playlistdb.update_one( | |
| {"chat_id": chat_id}, | |
| {"$set": {"notes": notesd}}, | |
| upsert=True, | |
| ) | |
| return True | |
| return False | |
| # Command | |
| ADDPLAYLIST_COMMAND = "addplaylist" | |
| SHOWPLAYLIST_COMMAND = "showplaylist" | |
| DELETEPLAYLIST_COMMAND = "delplaylist" | |
| ADDSONG = "addsong" | |
| async def add_song(client, message: Message, _): | |
| if len(message.command) < 2: | |
| return await message.reply_text( | |
| " Pʟᴇᴀsᴇ ᴘʀᴏᴠɪᴅᴇ ᴀ sᴏɴɢ ɴᴀᴍᴇ ᴀғᴛᴇʀ ᴛʜᴇ ᴄᴏᴍᴍᴀɴᴅ..\n\n➥ Example:\n\n▷ /ᴀᴅᴅsᴏɴɢ Sᴏɴɢ Nᴀᴍᴇ" | |
| ) | |
| song_name = " ".join(message.command[1:]) # Join the command arguments to form the song name | |
| adding = await message.reply_text("Sᴇᴀʀᴄʜɪɴɢ ғᴏʀ ᴛʜᴇ sᴏɴɢ, ᴘʟᴇᴀsᴇ ᴡᴀɪᴛ..") | |
| # Search for the song on YouTube | |
| search_url = f'https://www.googleapis.com/youtube/v3/search?part=snippet&type=video&q={song_name}&key={YOUTUBE_API_KEY}' | |
| try: | |
| response = requests.get(search_url) | |
| data = response.json() | |
| if 'items' not in data or not data['items']: | |
| return await message.reply_text("Nᴏ sᴏɴɢ ғᴏᴜɴᴅ ᴡɪᴛʜ ᴛʜᴇ ᴘʀᴏᴠɪᴅᴇᴅ ɴᴀᴍᴇ.") | |
| user_id = message.from_user.id | |
| item = data['items'][0] | |
| video_id = item['id']['videoId'] | |
| title = item['snippet']['title'] | |
| duration = "Unknown" # Placeholder for duration | |
| # Save the song item | |
| song = { | |
| "videoid": video_id, | |
| "title": title, | |
| "duration": duration, | |
| } | |
| await save_playlist(user_id, video_id, song) | |
| await adding.delete() | |
| return await message.reply_text( | |
| text="Sᴏɴɢ ʜᴀs ʙᴇᴇɴ ᴀᴅᴅᴇᴅ sᴜᴄᴄᴇssғᴜʟʟʏ ғʀᴏᴍ ʏᴏᴜʀ sᴇᴀʀᴄʜ\n\n Iғ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ʀᴇᴍᴏᴠᴇ ᴀɴʏ sᴏɴɢ ᴛʜᴇɴ ᴄʟɪᴄᴋ ᴛʜᴇ ʙᴜᴛᴛᴏɴ ʙᴇʟᴏᴡ.\n\n Cʜᴇᴄᴋ ʙʏ » /playlist\n\n Pʟᴀʏ ʙʏ » /Pʟᴀʏ" | |
| ) | |
| except Exception as e: | |
| return await message.reply_text(f"Eʀʀᴏʀ sᴇᴀʀᴄʜɪɴɢ ғᴏʀ sᴏɴɢ: {str(e)}") | |
| async def check_playlist(client, message: Message, _): | |
| user_id = message.from_user.id | |
| current_time = time() | |
| # Update the last message timestamp for the user | |
| last_message_time = user_last_message_time.get(user_id, 0) | |
| if current_time - last_message_time < SPAM_WINDOW_SECONDS: | |
| # If less than the spam window time has passed since the last message | |
| user_last_message_time[user_id] = current_time | |
| user_command_count[user_id] = user_command_count.get(user_id, 0) + 1 | |
| if user_command_count[user_id] > SPAM_THRESHOLD: | |
| # Block the user if they exceed the threshold | |
| hu = await message.reply_text( | |
| f"**{message.from_user.mention} ᴘʟᴇᴀsᴇ ᴅᴏɴᴛ ᴅᴏ sᴘᴀᴍ, ᴀɴᴅ ᴛʀʏ ᴀɢᴀɪɴ ᴀғᴛᴇʀ 5 sᴇᴄ**" | |
| ) | |
| await asyncio.sleep(3) | |
| await hu.delete() | |
| return | |
| else: | |
| # If more than the spam window time has passed, reset the command count and update the message timestamp | |
| user_command_count[user_id] = 1 | |
| user_last_message_time[user_id] = current_time | |
| _playlist = await get_playlist_names(message.from_user.id) | |
| if _playlist: | |
| get = await message.reply_text(_["playlist_2"]) | |
| else: | |
| return await message.reply_text(_["playlist_3"]) | |
| msg = _["playlist_4"] | |
| count = 0 | |
| for shikhar in _playlist: | |
| _note = await get_playlist(message.from_user.id, shikhar) | |
| title = _note["title"] | |
| title = title.title() | |
| duration = _note["duration"] | |
| count += 1 | |
| msg += f"\n\n{count}- {title[:70]}\n" | |
| msg += _["playlist_5"].format(duration) | |
| link = await DragBin(msg) | |
| lines = msg.count("\n") | |
| if lines >= 17: | |
| car = os.linesep.join(msg.split(os.linesep)[:17]) | |
| else: | |
| car = msg | |
| carbon = await Carbon.generate(car, randint(100, 10000000000)) | |
| await get.delete() | |
| await message.reply_photo(carbon, caption=_["playlist_15"].format(link)) | |
| async def get_keyboard(_, user_id): | |
| keyboard = InlineKeyboard(row_width=5) | |
| _playlist = await get_playlist_names(user_id) | |
| count = len(_playlist) | |
| for x in _playlist: | |
| _note = await get_playlist(user_id, x) | |
| title = _note["title"] | |
| title = title.title() | |
| keyboard.row( | |
| InlineKeyboardButton( | |
| text=title, | |
| callback_data=f"del_playlist {x}", | |
| ) | |
| ) | |
| keyboard.row( | |
| InlineKeyboardButton( | |
| text=_["PL_B_5"], | |
| callback_data=f"delete_warning", | |
| ), | |
| InlineKeyboardButton(text=_["CLOSE_BUTTON"], callback_data=f"close"), | |
| ) | |
| return keyboard, count | |
| async def del_plist_msg(client, message: Message, _): | |
| user_id = message.from_user.id | |
| current_time = time() | |
| # Update the last message timestamp for the user | |
| last_message_time = user_last_message_time.get(user_id, 0) | |
| if current_time - last_message_time < SPAM_WINDOW_SECONDS: | |
| # If less than the spam window time has passed since the last message | |
| user_last_message_time[user_id] = current_time | |
| user_command_count[user_id] = user_command_count.get(user_id, 0) + 1 | |
| if user_command_count[user_id] > SPAM_THRESHOLD: | |
| # Block the user if they exceed the threshold | |
| hu = await message.reply_text( | |
| f"**{message.from_user.mention} ᴘʟᴇᴀsᴇ ᴅᴏɴᴛ ᴅᴏ sᴘᴀᴍ, ᴀɴᴅ ᴛʀʏ ᴀɢᴀɪɴ ᴀғᴛᴇʀ 5 sᴇᴄ**" | |
| ) | |
| await asyncio.sleep(3) | |
| await hu.delete() | |
| return | |
| else: | |
| # If more than the spam window time has passed, reset the command count and update the message timestamp | |
| user_command_count[user_id] = 1 | |
| user_last_message_time[user_id] = current_time | |
| _playlist = await get_playlist_names(message.from_user.id) | |
| if _playlist: | |
| get = await message.reply_text(_["playlist_2"]) | |
| else: | |
| return await message.reply_text(_["playlist_3"]) | |
| keyboard, count = await get_keyboard(_, message.from_user.id) | |
| await get.edit_text(_["playlist_7"].format(count), reply_markup=keyboard) | |
| async def play_playlist(client, CallbackQuery, _): | |
| callback_data = CallbackQuery.data.strip() | |
| mode = callback_data.split(None, 1)[1] | |
| user_id = CallbackQuery.from_user.id | |
| _playlist = await get_playlist_names(user_id) | |
| if not _playlist: | |
| try: | |
| return await CallbackQuery.answer( | |
| _["playlist_3"], | |
| show_alert=True, | |
| ) | |
| except: | |
| return | |
| chat_id = CallbackQuery.message.chat.id | |
| user_name = CallbackQuery.from_user.first_name | |
| await CallbackQuery.message.delete() | |
| result = [] | |
| try: | |
| await CallbackQuery.answer() | |
| except: | |
| pass | |
| video = True if mode == "v" else None | |
| mystic = await CallbackQuery.message.reply_text(_["play_1"]) | |
| for vidids in _playlist: | |
| result.append(vidids) | |
| try: | |
| await stream( | |
| _, | |
| mystic, | |
| user_id, | |
| result, | |
| chat_id, | |
| user_name, | |
| CallbackQuery.message.chat.id, | |
| video, | |
| streamtype="playlist", | |
| ) | |
| except Exception as e: | |
| ex_type = type(e).__name__ | |
| err = e if ex_type == "AssistantErr" else _["general_3"].format(ex_type) | |
| return await mystic.edit_text(err) | |
| return await mystic.delete() | |
| async def play_playlist_command(client, message, _): | |
| mode = message.command[0][0] | |
| user_id = message.from_user.id | |
| _playlist = await get_playlist_names(user_id) | |
| if not _playlist: | |
| try: | |
| return await message.reply( | |
| _["playlist_3"], | |
| quote=True, | |
| ) | |
| except: | |
| return | |
| chat_id = message.chat.id | |
| user_name = message.from_user.first_name | |
| try: | |
| await message.delete() | |
| except: | |
| pass | |
| result = [] | |
| video = True if mode == "v" else None | |
| mystic = await message.reply_text(_["play_1"]) | |
| for vidids in _playlist: | |
| result.append(vidids) | |
| try: | |
| await stream( | |
| _, | |
| mystic, | |
| user_id, | |
| result, | |
| chat_id, | |
| user_name, | |
| message.chat.id, | |
| video, | |
| streamtype="playlist", | |
| ) | |
| except Exception as e: | |
| ex_type = type(e).__name__ | |
| err = e if ex_type == "AssistantErr" else _["general_3"].format(ex_type) | |
| return await mystic.edit_text(err) | |
| return await mystic.delete() | |
| import json | |
| # Combined add_playlist function | |
| async def add_playlist(client, message: Message, _): | |
| if len(message.command) < 2: | |
| return await message.reply_text( | |
| "Pʟᴇᴀsᴇ ᴘʀᴏᴠɪᴅᴇ ᴀ YᴏᴜTᴜʙᴇ ᴘʟᴀʏʟɪsᴛ ʟɪɴᴋ ᴀғᴛᴇʀ ᴛʜᴇ ᴄᴏᴍᴍᴀɴᴅ.\n\n Exᴀᴍᴘʟᴇ: \n\n /ᴀᴅᴅᴘʟᴀʏʟɪsᴛ ʜᴛᴛᴘs://ᴡᴡᴡ.ʏᴏᴜᴛᴜʙᴇ.ᴄᴏᴍ/ᴘʟᴀʏʟɪsᴛ?ʟɪsᴛ=YOUR_PLAYLIST_ID" | |
| ) | |
| query = message.command[1] | |
| # Check if the provided input is a YouTube playlist link | |
| if "youtube.com/playlist" in query: | |
| adding = await message.reply_text("Aᴅᴅɪɴɢ sᴏɴɢs ᴛᴏ ᴛʜᴇ ᴘʟᴀʏʟɪsᴛ, ᴘʟᴇᴀsᴇ ᴡᴀɪᴛ..") | |
| # Extract the playlist ID from the link | |
| playlist_id = query.split("list=")[-1].split("&")[0] | |
| # Fetch playlist items using YouTube Data API | |
| url = f'https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&maxResults=50&playlistId={playlist_id}&key={YOUTUBE_API_KEY}' | |
| try: | |
| response = requests.get(url) | |
| data = response.json() | |
| if 'items' not in data or not data['items']: | |
| return await message.reply_text("Nᴏ sᴏɴɢs ғᴏᴜɴᴅ ɪɴ ᴛʜᴇ ᴘʟᴀʏʟɪsᴛ...") | |
| user_id = message.from_user.id | |
| for item in data['items']: | |
| video_id = item['snippet']['resourceId']['videoId'] | |
| title = item['snippet']['title'] | |
| # Duration is not available in playlistItems, you may need to fetch it separately | |
| duration = "Unknown" # Placeholder for duration | |
| # Save the playlist item | |
| plist = { | |
| "videoid": video_id, | |
| "title": title, | |
| "duration": duration, | |
| } | |
| await save_playlist(user_id, video_id, plist) | |
| keyboardes = InlineKeyboardMarkup( | |
| [ | |
| [ | |
| InlineKeyboardButton( | |
| "๏ Want to remove any songs? ๏", | |
| callback_data=f"open_playlist {user_id}", | |
| ) | |
| ] | |
| ] | |
| ) | |
| await adding.delete() | |
| return await message.reply_text( | |
| text="Aʟʟ sᴏɴɢs ʜᴀᴠᴇ ʙᴇᴇɴ ᴀᴅᴅᴇᴅ sᴜᴄᴄᴇssғᴜʟʟʏ ғʀᴏᴍ ʏᴏᴜʀ YᴏᴜTᴜʙᴇ ᴘʟᴀʏʟɪsᴛ ʟɪɴᴋ\n\n Iғ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ʀᴇᴍᴏᴠᴇ ᴀɴʏ sᴏɴɢ ᴛʜᴇɴ ᴄʟɪᴄᴋ ᴛʜᴇ ʙᴜᴛᴛᴏɴ ʙᴇʟᴏᴡ.\n\n Check by » /playlist**\n\n Play by » /play \n ᴅᴇʟᴇᴛᴇ ᴛʜᴇ ᴘʟᴀʏʟɪsᴛ ʙʏ /ᴅᴇʟᴇᴛᴇᴘʟᴀʏʟɪsᴛ \n", | |
| reply_markup=keyboardes, | |
| ) | |
| except Exception as e: | |
| return await message.reply_text(f"Eʀʀᴏʀ ғᴇᴛᴄʜɪɴɢ ᴘʟᴀʏʟɪsᴛ: {str(e)}") | |
| else: | |
| return await message.reply_text("Pʟᴇᴀsᴇ ᴘʀᴏᴠɪᴅᴇ ᴀ ᴠᴀʟɪᴅ YᴏᴜTᴜʙᴇ ᴘʟᴀʏʟɪsᴛ ʟɪɴᴋ.") | |
| pass | |
| if "youtube.com/@" in query: | |
| addin = await message.reply_text( | |
| "ᴀᴅᴅɪɴɢ sᴏɴɢs ɪɴ ᴘʟᴀʏʟɪsᴛ ᴘʟᴇᴀsᴇ ᴡᴀɪᴛ..**" | |
| ) | |
| try: | |
| from pytube import YouTube | |
| channel_username = query | |
| videos = YouTube_videos(f"{query}/videos") | |
| video_urls = [video["url"] for video in videos] | |
| except Exception as e: | |
| # Handle exception | |
| return await message.reply_text(f"Error: {e}") | |
| if not video_urls: | |
| return await message.reply_text( | |
| "ɴᴏ sᴏɴɢs ғᴏᴜɴᴅ ɪɴ ᴛʜᴇ YᴏᴜTᴜʙᴇ ᴄʜᴀɴɴᴇʟ.\n\n ᴛʀʏ ᴏᴛʜᴇʀ YᴏᴜTᴜʙᴇ ᴄʜᴀɴɴᴇʟ ʟɪɴᴋ" | |
| ) | |
| user_id = message.from_user.id | |
| for video_url in video_urls: | |
| videosid = query.split("/")[-1].split("?")[0] | |
| try: | |
| yt = YouTube(f"https://youtu.be/{videosid}") | |
| title = yt.title | |
| duration = yt.length | |
| except Exception as e: | |
| return await message.reply_text(f"ᴇʀʀᴏʀ ғᴇᴛᴄʜɪɴɢ ᴠɪᴅᴇᴏ ɪɴғᴏ: {e}") | |
| plist = { | |
| "videoid": video_id, | |
| "title": title, | |
| "duration": duration, | |
| } | |
| await save_playlist(user_id, video_id, plist) | |
| keyboardes = InlineKeyboardMarkup( | |
| [ | |
| [ | |
| InlineKeyboardButton( | |
| "๏ ᴡᴀɴᴛ ʀᴇᴍᴏᴠᴇ ᴀɴʏ sᴏɴɢs? ๏", | |
| callback_data=f"open_playlist {user_id}", | |
| ) | |
| ] | |
| ] | |
| ) | |
| await addin.delete() | |
| return await message.reply_text( | |
| text="ᴀʟʟ sᴏɴɢs ʜᴀs ʙᴇᴇɴ ᴀᴅᴅᴇᴅ sᴜᴄᴄᴇssғᴜʟʟʏ ғʀᴏᴍ ʏᴏᴜʀ ʏᴏᴜᴛᴜʙᴇ channel ʟɪɴᴋ \n\n ɪғ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ʀᴇᴍᴏᴠᴇ ᴀɴʏ sᴏɴɢ ᴛʜᴇɴ ᴄʟɪᴄᴋ ɢɪᴠᴇɴ ʙᴇʟᴏᴡ ʙᴜᴛᴛᴏɴ.\n\n ᴄʜᴇᴄᴋ ʙʏ » /playlist \n\n ᴘʟᴀʏ ʙʏ » /play \n ᴅᴇʟᴇᴛᴇ ᴛʜᴇ ᴘʟᴀʏʟɪsᴛ ʙʏ /ᴅᴇʟᴇᴛᴇᴘʟᴀʏʟɪsᴛ \n", | |
| reply_markup=keyboardes, | |
| ) | |
| pass | |
| # Check if the provided input is a YouTube video link | |
| if "https://youtu.be" in query: | |
| try: | |
| add = await message.reply_text( | |
| "ᴀᴅᴅɪɴɢ sᴏɴɢs ɪɴ ᴘʟᴀʏʟɪsᴛ ᴘʟᴇᴀsᴇ ᴡᴀɪᴛ.." | |
| ) | |
| from pytube import Playlist | |
| from pytube import YouTube | |
| # Extract video ID from the YouTube lin | |
| videoid = query.split("/")[-1].split("?")[0] | |
| user_id = message.from_user.id | |
| thumbnail = f"https://img.youtube.com/vi/{videoid}/maxresdefault.jpg" | |
| _check = await get_playlist(user_id, videoid) | |
| if _check: | |
| try: | |
| await add.delete() | |
| return await message.reply_photo(thumbnail, caption=_["playlist_8"]) | |
| except KeyError: | |
| pass | |
| _count = await get_playlist_names(user_id) | |
| count = len(_count) | |
| if count == SERVER_PLAYLIST_LIMIT: | |
| try: | |
| return await message.reply_text( | |
| _["playlist_9"].format(SERVER_PLAYLIST_LIMIT) | |
| ) | |
| except KeyError: | |
| pass | |
| try: | |
| yt = YouTube(f"https://youtu.be/{videoid}") | |
| title = yt.title | |
| duration = yt.length | |
| thumbnail = f"https://img.youtube.com/vi/{videoid}/maxresdefault.jpg" | |
| plist = { | |
| "videoid": videoid, | |
| "title": title, | |
| "duration": duration, | |
| } | |
| await save_playlist(user_id, videoid, plist) | |
| # Create inline keyboard with remove button | |
| keyboard = InlineKeyboardMarkup( | |
| [ | |
| [ | |
| InlineKeyboardButton( | |
| "๏ Remove from Playlist ๏", | |
| callback_data=f"remove_playlist {videoid}", | |
| ) | |
| ] | |
| ] | |
| ) | |
| await add.delete() | |
| await message.reply_photo( | |
| thumbnail, | |
| caption="ᴀᴅᴅᴇᴅ sᴏɴɢ ɪɴ ʏᴏᴜʀ ʙᴏᴛ ᴘʟᴀʏʟɪsᴛ ✅ \n\n ᴄʜᴇᴄᴋ ʙʏ /playlist \n\n ᴅᴇʟᴇᴛᴇ ᴛʜᴇ ᴘʟᴀʏʟɪsᴛ ʙʏ /ᴅᴇʟᴇᴛᴇᴘʟᴀʏʟɪsᴛ\n\n ➥ ᴀɴᴅ ᴘʟᴀʏ ʙʏ » /play (ɢʀᴏᴜᴘs ᴏɴʟʏ)", | |
| reply_markup=keyboard, | |
| ) | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| await message.reply_text(str(e)) | |
| except Exception as e: | |
| return await message.reply_text(str(e)) | |
| pass | |
| else: | |
| from DragMusic import YouTube | |
| # Add a specific song by name | |
| query = " ".join(message.command[1:]) | |
| print(query) | |
| try: | |
| results = YoutubeSearch(query, max_results=1).to_dict() | |
| link = f"https://youtube.com{results[0]['url_suffix']}" | |
| title = results[0]["title"][:40] | |
| thumbnail = results[0]["thumbnails"][0] | |
| thumb_name = f"{title}.jpg" | |
| thumb = requests.get(thumbnail, allow_redirects=True) | |
| open(thumb_name, "wb").write(thumb.content) | |
| duration = results[0]["duration"] | |
| videoid = results[0]["id"] | |
| # Add these lines to define views and channel_name | |
| views = results[0]["views"] | |
| channel_name = results[0]["channel"] | |
| user_id = message.from_user.id | |
| _check = await get_playlist(user_id, videoid) | |
| if _check: | |
| try: | |
| return await message.reply_photo(thumbnail, caption=_["playlist_8"]) | |
| except KeyError: | |
| pass | |
| _count = await get_playlist_names(user_id) | |
| count = len(_count) | |
| if count == SERVER_PLAYLIST_LIMIT: | |
| try: | |
| return await message.reply_text( | |
| _["playlist_9"].format(SERVER_PLAYLIST_LIMIT) | |
| ) | |
| except KeyError: | |
| pass | |
| m = await message.reply("ᴀᴅᴅɪɴɢ ᴘʟᴇᴀsᴇ ᴡᴀɪᴛ...") | |
| title, duration_min, _, _, _ = await YouTube.details(videoid, True) | |
| title = (title[:50]).title() | |
| plist = { | |
| "videoid": videoid, | |
| "title": title, | |
| "duration": duration_min, | |
| } | |
| await save_playlist(user_id, videoid, plist) | |
| # Create inline keyboard with remove button | |
| keyboard = InlineKeyboardMarkup( | |
| [ | |
| [ | |
| InlineKeyboardButton( | |
| "๏ Remove from Playlist ๏", | |
| callback_data=f"remove_playlist {videoid}", | |
| ) | |
| ] | |
| ] | |
| ) | |
| await m.delete() | |
| await message.reply_photo( | |
| thumbnail, | |
| caption="ᴀᴅᴅᴇᴅ sᴏɴɢ ɪɴ ʏᴏᴜʀ ʙᴏᴛ ᴘʟᴀʏʟɪsᴛ ✅ \n\n➥ ᴄʜᴇᴄᴋ ʙʏ » /playlist \n\n ᴅᴇʟᴇᴛᴇ ᴛʜᴇ ᴘʟᴀʏʟɪsᴛ ʙʏ /ᴅᴇʟᴇᴛᴇᴘʟᴀʏʟɪsᴛ \n\n ᴀɴᴅ ᴘʟᴀʏ ʙʏ » /play (ɢʀᴏᴜᴘs ᴏɴʟʏ)", | |
| reply_markup=keyboard, | |
| ) | |
| except KeyError: | |
| return await message.reply_text("ɪɴᴠᴀʟɪᴅ ᴅᴀᴛᴀ ғᴏʀᴍᴀᴛ ʀᴇᴄᴇɪᴠᴇᴅ.") | |
| except Exception as e: | |
| pass | |
| async def open_playlist(client, CallbackQuery, _): | |
| _playlist = await get_playlist_names(CallbackQuery.from_user.id) | |
| if _playlist: | |
| get = await CallbackQuery.message.edit_text(_["playlist_2"]) | |
| else: | |
| return await CallbackQuery.message.edit_text(_["playlist_3"]) | |
| keyboard, count = await get_keyboard(_, CallbackQuery.from_user.id) | |
| await get.edit_text(_["playlist_7"].format(count), reply_markup=keyboard) | |
| async def del_plist(client, CallbackQuery, _): | |
| callback_data = CallbackQuery.data.strip() | |
| videoid = callback_data.split(None, 1)[1] | |
| user_id = CallbackQuery.from_user.id | |
| deleted = await delete_playlist(CallbackQuery.from_user.id, videoid) | |
| if deleted: | |
| try: | |
| await CallbackQuery.answer(_["playlist_11"], show_alert=True) | |
| except: | |
| pass | |
| else: | |
| try: | |
| return await CallbackQuery.answer(_["playlist_12"], show_alert=True) | |
| except: | |
| return | |
| keyboards = InlineKeyboardMarkup( | |
| [ | |
| [ | |
| InlineKeyboardButton( | |
| "๏ ʀᴇᴄᴏᴠᴇʀ ʏᴏᴜʀ sᴏɴɢ ๏", callback_data=f"recover_playlist {videoid}" | |
| ) | |
| ] | |
| ] | |
| ) | |
| return await CallbackQuery.edit_message_text( | |
| text="ʏᴏᴜʀ sᴏɴɢ ʜᴀs ʙᴇᴇɴ ᴅᴇʟᴇᴛᴇᴅ ғʀᴏᴍ ʏᴏᴜʀ ʙᴏᴛ ᴘʟᴀʏʟɪsᴛ \n\n ɪғ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ʀᴇᴄᴏᴠᴇʀ ʏᴏᴜʀ sᴏɴɢ ɪɴ ʏᴏᴜʀ ᴘʟᴀʏʟɪsᴛ ᴛʜᴇɴ ᴄʟɪᴄᴋ ɢɪᴠᴇɴ ʙᴇʟᴏᴡ ʙᴜᴛᴛᴏɴ", | |
| reply_markup=keyboards, | |
| ) | |
| async def add_playlist(client, CallbackQuery, _): | |
| from DragMusic import YouTube | |
| callback_data = CallbackQuery.data.strip() | |
| videoid = callback_data.split(None, 1)[1] | |
| user_id = CallbackQuery.from_user.id | |
| _check = await get_playlist(user_id, videoid) | |
| if _check: | |
| try: | |
| return await CallbackQuery.answer(_["playlist_8"], show_alert=True) | |
| except: | |
| return | |
| _count = await get_playlist_names(user_id) | |
| count = len(_count) | |
| if count == SERVER_PLAYLIST_LIMIT: | |
| try: | |
| return await CallbackQuery.answer( | |
| _["playlist_9"].format(SERVER_PLAYLIST_LIMIT), | |
| show_alert=True, | |
| ) | |
| except: | |
| return | |
| ( | |
| title, | |
| duration_min, | |
| duration_sec, | |
| thumbnail, | |
| vidid, | |
| ) = await YouTube.details(videoid, True) | |
| title = (title[:50]).title() | |
| plist = { | |
| "videoid": vidid, | |
| "title": title, | |
| "duration": duration_min, | |
| } | |
| await save_playlist(user_id, videoid, plist) | |
| try: | |
| title = (title[:30]).title() | |
| keyboardss = InlineKeyboardMarkup( | |
| [ | |
| [ | |
| InlineKeyboardButton( | |
| "๏ ʀᴇᴍᴏᴠᴇ ᴀɢᴀɪɴ ๏", callback_data=f"remove_playlist {videoid}" | |
| ) | |
| ] | |
| ] | |
| ) | |
| return await CallbackQuery.edit_message_text( | |
| text=" ʀᴇᴄᴏᴠᴇʀᴇᴅ sᴏɴɢ ɪɴ ʏᴏᴜʀ ᴘʟᴀʏʟɪsᴛ \n\n Cʜᴇᴄᴋ Pʟᴀʏʟɪsᴛ ʙʏ /playlist \n\n ᴅᴇʟᴇᴛᴇ ᴛʜᴇ ᴘʟᴀʏʟɪsᴛ ʙʏ /ᴅᴇʟᴇᴛᴇᴘʟᴀʏʟɪsᴛ \n\n ᴀɴᴅ ᴘʟᴀʏ ᴘʟᴀʏʟɪsᴛ ʙʏ /play", | |
| reply_markup=keyboardss, | |
| ) | |
| except: | |
| return | |
| async def add_playlist(client, CallbackQuery, _): | |
| await CallbackQuery.answer( | |
| "➻ᴛᴏ ᴀᴅᴅ ᴀ sᴏɴɢ ɪɴ ʏᴏᴜʀ ᴘʟᴀʏʟɪsᴛ ᴊᴜsᴛ ᴛʏᴘᴇ /addplaylist (Here your song name)\n\n ᴇxᴀᴍᴘʟᴇ » /addplaylist Blue Eyes Blue tyes.", | |
| show_alert=True, | |
| ) | |
| async def add_playlists(client, CallbackQuery, _): | |
| callback_data = CallbackQuery.data.strip() | |
| videoid = callback_data.split(None, 1)[1] | |
| user_id = CallbackQuery.from_user.id | |
| from DragMusic import YouTube | |
| _check = await get_playlist(user_id, videoid) | |
| if _check: | |
| try: | |
| from DragMusic import YouTube | |
| return await CallbackQuery.answer(_["playlist_8"], show_alert=True) | |
| except: | |
| return | |
| _count = await get_playlist_names(user_id) | |
| count = len(_count) | |
| if count == SERVER_PLAYLIST_LIMIT: | |
| try: | |
| return await CallbackQuery.answer( | |
| _["playlist_9"].format(SERVER_PLAYLIST_LIMIT), | |
| show_alert=True, | |
| ) | |
| except: | |
| return | |
| ( | |
| title, | |
| duration_min, | |
| duration_sec, | |
| thumbnail, | |
| vidid, | |
| ) = await YouTube.details(videoid, True) | |
| title = (title[:50]).title() | |
| plist = { | |
| "videoid": vidid, | |
| "title": title, | |
| "duration": duration_min, | |
| } | |
| await save_playlist(user_id, videoid, plist) | |
| try: | |
| title = (title[:30]).title() | |
| return await CallbackQuery.answer( | |
| _["playlist_10"].format(title), show_alert=True | |
| ) | |
| except: | |
| return | |
| # New command | |
| DELETE_ALL_PLAYLIST_COMMAND = "delallplaylist" | |
| async def delete_all_playlists(client, message, _): | |
| from DragMusic import YouTube | |
| user_id = message.from_user.id | |
| _playlist = await get_playlist_names(user_id) | |
| if _playlist: | |
| try: | |
| upl = warning_markup(_) | |
| await message.reply_text(_["playlist_14"], reply_markup=upl) | |
| except: | |
| pass | |
| else: | |
| await message.reply_text(_["playlist_3"]) | |
| async def del_plist(client, CallbackQuery, _): | |
| from DragMusic import YouTube | |
| callback_data = CallbackQuery.data.strip() | |
| videoid = callback_data.split(None, 1)[1] | |
| user_id = CallbackQuery.from_user.id | |
| deleted = await delete_playlist(CallbackQuery.from_user.id, videoid) | |
| if deleted: | |
| try: | |
| await CallbackQuery.answer(_["playlist_11"], show_alert=True) | |
| except: | |
| pass | |
| else: | |
| try: | |
| return await CallbackQuery.answer(_["playlist_12"], show_alert=True) | |
| except: | |
| return | |
| keyboard, count = await get_keyboard(_, user_id) | |
| return await CallbackQuery.edit_message_reply_markup(reply_markup=keyboard) | |
| async def del_whole_playlist(client, CallbackQuery, _): | |
| from DragMusic import YouTube | |
| _playlist = await get_playlist_names(CallbackQuery.from_user.id) | |
| for x in _playlist: | |
| await CallbackQuery.answer( | |
| "➻ ᴏᴋ sɪʀ ᴘʟᴇᴀsᴇ ᴡᴀɪᴛ.\n\n➥ ᴅᴇʟᴇᴛɪɴɢ ʏᴏᴜʀ ᴘʟᴀʏʟɪsᴛ...", show_alert=True | |
| ) | |
| await delete_playlist(CallbackQuery.from_user.id, x) | |
| return await CallbackQuery.edit_message_text(_["playlist_13"]) | |
| async def get_playlist_playmode_(client, CallbackQuery, _): | |
| try: | |
| await CallbackQuery.answer() | |
| except: | |
| pass | |
| buttons = get_playlist_markup(_) | |
| return await CallbackQuery.edit_message_reply_markup( | |
| reply_markup=InlineKeyboardMarkup(buttons) | |
| ) | |
| async def delete_warning_message(client, CallbackQuery, _): | |
| from DragMusic import YouTube | |
| try: | |
| await CallbackQuery.answer() | |
| except: | |
| pass | |
| upl = warning_markup(_) | |
| return await CallbackQuery.edit_message_text(_["playlist_14"], reply_markup=upl) | |
| async def home_play_(client, CallbackQuery, _): | |
| from DragMusic import YouTube | |
| try: | |
| await CallbackQuery.answer() | |
| except: | |
| pass | |
| buttons = botplaylist_markup(_) | |
| return await CallbackQuery.edit_message_reply_markup( | |
| reply_markup=InlineKeyboardMarkup(buttons) | |
| ) | |
| async def del_back_playlist(client, CallbackQuery, _): | |
| from DragMusic import YouTube | |
| user_id = CallbackQuery.from_user.id | |
| _playlist = await get_playlist_names(user_id) | |
| if _playlist: | |
| try: | |
| await CallbackQuery.answer(_["playlist_2"], show_alert=True) | |
| except: | |
| pass | |
| else: | |
| try: | |
| return await CallbackQuery.answer(_["playlist_3"], show_alert=True) | |
| except: | |
| return | |
| keyboard, count = await get_keyboard(_, user_id) | |
| return await CallbackQuery.edit_message_text( | |
| _["playlist_7"].format(count), reply_markup=keyboard | |
| ) | |