akenohub / main.py
randydev's picture
Update main.py
a811e40 verified
raw
history blame
10.6 kB
import logging
import time
import os
import requests
from pyrogram import Client, filters
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from pyrogram.types import *
from pyrogram.errors import *
from pyrogram.enums import *
import akenoai as ak
from config import API_KEY, API_ID, API_HASH, BOT_TOKEN
from scripts import progress
logging.basicConfig(
level=logging.WARNING, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logging.getLogger("pyrogram").setLevel(logging.WARNING)
WELCOME_TEXT = """
Hey! {first_name}
Saya adalah bot untuk mengunduh video PornoHub di telegram.
- Jangan share video ke group (mending private bot aman)
- Command: /hubsearch boobs
- Command: /hubdl link xnxx
- Command: /freemode
- command: /check_status
Powered by @xtdevs
"""
USER_TRACK = """
β€’ AkenoHUB
UserID: {user_id}
First Name: {first_name}
Username: @{username}
Check DB: {user_db}
"""
USER_LOGS = """
β€’ AkenoHUB
UserID: {user_id}
First Name: {first_name}
Username: @{username}
Text Log: {type_text}
"""
client = Client(
"AkenoHub",
api_id=API_ID,
api_hash=API_HASH,
bot_token=BOT_TOKEN
)
api = ak.PornoHub(key=API_KEY)
not_allowed = ["hi", "hello", "hey", "hai", "help"]
def set_user_update_in_db(user_id: int, first_name: str, username: str):
url = "https://private-akeno.randydev.my.id/api/v2/update_in_db"
params = {
"user_id": user_id,
"first_name": first_name,
"username": username
}
response = requests.post(url, params=params)
if response.status_code != 200:
return None
return response.json()
def toggle_free_mode(user_id):
try:
response = requests.post(f"https://private-akeno.randydev.my.id/api/v2/akenohub/freemode?user_id={user_id}")
return response.json()
except requests.RequestException as e:
return {"message": "Error toggling free mode: ok", "free_mode_enabled": False}
def check_user_expiration(user_id: int):
url = f"https://private-akeno.randydev.my.id/api/v2/akenohub/check?user_id={user_id}"
try:
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
return {"expired": True, "free_mode": False}
except requests.RequestException as e:
return {"expired": True, "free_mode": False, "error": "ok"}
@client.on_message(
filters.incoming
& filters.private,
group=-1
)
async def must_join_channel(bot: Client, msg: Message):
MUST_JOIN = "RendyProjects"
try:
if not (
await check_membership(MUST_JOIN, bot, msg)
):
force_button = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Rendy Projects",
url=f"https://t.me/{MUST_JOIN}"
)
]
]
)
mention = msg.from_user.mention if msg.from_user else ""
user_id = msg.from_user.id if msg.from_user else 0
await msg.reply(
f"Hey {mention}\n⚠️ To use this bot you have to <b>subscribe to our channel</b>",
disable_web_page_preview=True,
reply_markup=force_button
)
await msg.stop_propagation()
except ChatAdminRequired:
pass
async def check_membership(channel_id, bot, msg):
try:
user_id = msg.from_user.id if msg.from_user else 0
mention_user = await bot.get_users(user_id)
user = await bot.get_chat_member(channel_id, user_id)
if user.status == ChatMemberStatus.BANNED:
admin_support = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Developer",
url="https://t.me/xtdevs"
)
]
]
)
mention = mention_user.mention if mention_user else ""
await bot.send_message(
msg.chat.id,
text=f"❌ you {mention} have been blocked from the group support\n\nclick the button below to contact the group admin",
reply_markup=admin_support
)
return False
return True
except UserNotParticipant:
return False
@client.on_message(
filters.incoming
& filters.command("start")
& filters.private
)
async def welcome_start(client: Client, message: Message):
response = set_user_update_in_db(
user_id=message.from_user.id,
first_name=message.from_user.first_name,
username=message.from_user.username if message.from_user else None
)
if response is None:
return await message.reply_text("Try again problem")
keyboard = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="πŸ“’ Channel",
url="https://t.me/RendyProjects"
)
]
]
)
await client.send_message(
-1002290885889,
USER_TRACK.format(
user_id=message.from_user.id if message.from_user else 0,
first_name=message.from_user.first_name,
username=message.from_user.username if message.from_user else None,
user_db=response["message"]
)
)
await message.reply_text(
WELCOME_TEXT.format(
first_name=message.from_user.first_name
),
reply_markup=keyboard
)
@client.on_message(
filters.incoming
& filters.command(["hubsearch"])
& filters.private
& ~filters.forwarded
)
async def searchhub(client: Client, message: Message):
user_id = message.from_user.id
query = message.text.split(" ", 1)[1] if len(message.command) > 1 else None
if not query:
return await message.reply_text("please search for pornohub.")
pro = await message.reply_text("Processing.....")
if query in not_allowed:
return await pro.edit_text("I don't understand, can you help me?")
try:
user_status = check_user_expiration(user_id)
protect_content = not user_status["free_mode"]
response = await api.x_search(query=query)
if response is None:
return await pro.edit_text("nothing found gay")
link = response[0]
file_path, thumb, _ = await api.x_download(url=link, is_stream=True)
upload_text = f"**⬆️ π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ video ...**"
await pro.edit_text(upload_text)
await client.send_message(
-1002290885889,
USER_LOGS.format(
user_id=message.from_user.id if message.from_user else 0,
first_name=message.from_user.first_name,
username=message.from_user.username if message.from_user else None,
type_text=query
)
)
await client.send_video(
message.chat.id,
file_path,
caption=f"β€’ Powered by {client.me.mention}",
thumb=thumb,
has_spoiler=True,
protect_content=protect_content,
progress=progress,
progress_args=(
pro,
time.time(),
upload_text
)
)
await pro.delete()
os.remove(thumb)
os.remove(file_path)
except Exception as e:
await pro.edit_text(str(e))
def is_pornohub_url(url):
import re
pattern = r"(https?)://www\.xnxx\.com/([\w\-]+)?/(\w+)"
match = re.search(pattern, url)
return bool(match)
@client.on_message(
filters.incoming
& filters.command(["hubdl"])
& filters.private
& ~filters.forwarded
)
async def porno_download(client: Client, message: Message):
user_id = message.from_user.id
link = message.text.split(" ", 1)[1] if len(message.command) > 1 else None
if not link:
return await message.reply_text("please link for pornohub.")
if not is_pornohub_url(link):
return await message.reply_text("invalid link.")
pro = await message.reply_text("Processing.....")
try:
user_status = check_user_expiration(user_id)
protect_content = not user_status["free_mode"]
file_path, thumb, _ = await api.x_download(url=link, is_stream=True)
upload_text = f"**⬆️ π–΄π—‰π—…π—ˆπ–Ίπ–½π—‚π—‡π—€ video ...**"
await pro.edit_text(upload_text)
await client.send_message(
-1002290885889,
USER_LOGS.format(
user_id=message.from_user.id if message.from_user else 0,
first_name=message.from_user.first_name,
username=message.from_user.username if message.from_user else None,
type_text=link
),
disable_web_page_preview=True
)
await client.send_video(
message.chat.id,
file_path,
caption=f"β€’ Powered by {client.me.mention}",
thumb=thumb,
has_spoiler=True,
protect_content=protect_content,
progress=progress,
progress_args=(
pro,
time.time(),
upload_text
)
)
await pro.delete()
os.remove(thumb)
os.remove(file_path)
except Exception as e:
await pro.edit_text(str(e))
@client.on_message(
filters.incoming
& filters.command(["freemode"])
& filters.private
)
async def freemode_handler(client, message):
user_id = message.from_user.id
free_mode_status = toggle_free_mode(user_id)
if free_mode_status.get("free_mode_enabled"):
await message.reply(f"Free mode activated for user {user_id}.")
elif "wait" in free_mode_status["message"]:
await message.reply(f"{free_mode_status['message']}")
else:
await message.reply(f"Free mode deactivated for user {user_id}.")
@client.on_message(
filters.incoming
& filters.command(["check_status"])
& filters.private
)
async def check_status_handler(client, message):
user_id = message.from_user.id
response = requests.get(f"https://private-akeno.randydev.my.id/api/v2/akenohub/check?user_id={user_id}")
if response.status_code == 200:
data = response.json()
await message.reply(data["message"])
else:
await message.reply("Error checking status.")
client.run()