|
import logging |
|
import time |
|
import os |
|
import requests |
|
from pyrogram import Client, filters |
|
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton |
|
from pyrogram.types import * |
|
from pyrogram.errors import * |
|
from pyrogram.enums import * |
|
import akenoai as ak |
|
from config import API_KEY, API_ID, API_HASH, BOT_TOKEN |
|
from scripts import progress |
|
|
|
logging.basicConfig( |
|
level=logging.WARNING, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
|
) |
|
logging.getLogger("pyrogram").setLevel(logging.WARNING) |
|
|
|
WELCOME_TEXT = """ |
|
Hey! {first_name} |
|
|
|
Saya adalah bot untuk mengunduh video PornoHub di telegram. |
|
|
|
- Jangan share video ke group (mending private bot aman) |
|
|
|
- Command: /hubsearch boobs |
|
- Command: /hubdl link xnxx |
|
- Command: /freemode |
|
- command: /check_status |
|
|
|
Powered by @xtdevs |
|
""" |
|
|
|
USER_TRACK = """ |
|
β’ AkenoHUB |
|
|
|
UserID: {user_id} |
|
First Name: {first_name} |
|
Username: @{username} |
|
Check DB: {user_db} |
|
""" |
|
|
|
USER_LOGS = """ |
|
β’ AkenoHUB |
|
|
|
UserID: {user_id} |
|
First Name: {first_name} |
|
Username: @{username} |
|
|
|
Text Log: {type_text} |
|
""" |
|
|
|
client = Client( |
|
"AkenoHub", |
|
api_id=API_ID, |
|
api_hash=API_HASH, |
|
bot_token=BOT_TOKEN |
|
) |
|
|
|
api = ak.PornoHub(key=API_KEY) |
|
|
|
not_allowed = ["hi", "hello", "hey", "hai", "help"] |
|
|
|
def set_user_update_in_db(user_id: int, first_name: str, username: str): |
|
url = "https://private-akeno.randydev.my.id/api/v2/update_in_db" |
|
params = { |
|
"user_id": user_id, |
|
"first_name": first_name, |
|
"username": username |
|
} |
|
response = requests.post(url, params=params) |
|
if response.status_code != 200: |
|
return None |
|
return response.json() |
|
|
|
def toggle_free_mode(user_id): |
|
try: |
|
response = requests.post(f"https://private-akeno.randydev.my.id/api/v2/akenohub/freemode?user_id={user_id}") |
|
return response.json() |
|
except requests.RequestException as e: |
|
return {"message": "Error toggling free mode: ok", "free_mode_enabled": False} |
|
|
|
def check_user_expiration(user_id: int): |
|
url = f"https://private-akeno.randydev.my.id/api/v2/akenohub/check?user_id={user_id}" |
|
try: |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
return response.json() |
|
else: |
|
return {"expired": True, "free_mode": False} |
|
except requests.RequestException as e: |
|
return {"expired": True, "free_mode": False, "error": "ok"} |
|
|
|
@client.on_message( |
|
filters.incoming |
|
& filters.private, |
|
group=-1 |
|
) |
|
async def must_join_channel(bot: Client, msg: Message): |
|
MUST_JOIN = "RendyProjects" |
|
try: |
|
if not ( |
|
await check_membership(MUST_JOIN, bot, msg) |
|
): |
|
force_button = InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
text="Rendy Projects", |
|
url=f"https://t.me/{MUST_JOIN}" |
|
) |
|
] |
|
] |
|
) |
|
mention = msg.from_user.mention if msg.from_user else "" |
|
user_id = msg.from_user.id if msg.from_user else 0 |
|
await msg.reply( |
|
f"Hey {mention}\nβ οΈ To use this bot you have to <b>subscribe to our channel</b>", |
|
disable_web_page_preview=True, |
|
reply_markup=force_button |
|
) |
|
await msg.stop_propagation() |
|
except ChatAdminRequired: |
|
pass |
|
|
|
async def check_membership(channel_id, bot, msg): |
|
try: |
|
user_id = msg.from_user.id if msg.from_user else 0 |
|
mention_user = await bot.get_users(user_id) |
|
user = await bot.get_chat_member(channel_id, user_id) |
|
if user.status == ChatMemberStatus.BANNED: |
|
admin_support = InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
text="Developer", |
|
url="https://t.me/xtdevs" |
|
) |
|
] |
|
] |
|
) |
|
mention = mention_user.mention if mention_user else "" |
|
await bot.send_message( |
|
msg.chat.id, |
|
text=f"β you {mention} have been blocked from the group support\n\nclick the button below to contact the group admin", |
|
reply_markup=admin_support |
|
) |
|
return False |
|
return True |
|
except UserNotParticipant: |
|
return False |
|
|
|
@client.on_message( |
|
filters.incoming |
|
& filters.command("start") |
|
& filters.private |
|
) |
|
async def welcome_start(client: Client, message: Message): |
|
response = set_user_update_in_db( |
|
user_id=message.from_user.id, |
|
first_name=message.from_user.first_name, |
|
username=message.from_user.username if message.from_user else None |
|
) |
|
if response is None: |
|
return await message.reply_text("Try again problem") |
|
keyboard = InlineKeyboardMarkup( |
|
[ |
|
[ |
|
InlineKeyboardButton( |
|
text="π’ Channel", |
|
url="https://t.me/RendyProjects" |
|
) |
|
] |
|
] |
|
) |
|
await client.send_message( |
|
-1002290885889, |
|
USER_TRACK.format( |
|
user_id=message.from_user.id if message.from_user else 0, |
|
first_name=message.from_user.first_name, |
|
username=message.from_user.username if message.from_user else None, |
|
user_db=response["message"] |
|
) |
|
) |
|
await message.reply_text( |
|
WELCOME_TEXT.format( |
|
first_name=message.from_user.first_name |
|
), |
|
reply_markup=keyboard |
|
) |
|
|
|
@client.on_message( |
|
filters.incoming |
|
& filters.command(["hubsearch"]) |
|
& filters.private |
|
& ~filters.forwarded |
|
) |
|
async def searchhub(client: Client, message: Message): |
|
user_id = message.from_user.id |
|
query = message.text.split(" ", 1)[1] if len(message.command) > 1 else None |
|
if not query: |
|
return await message.reply_text("please search for pornohub.") |
|
pro = await message.reply_text("Processing.....") |
|
if query in not_allowed: |
|
return await pro.edit_text("I don't understand, can you help me?") |
|
try: |
|
user_status = check_user_expiration(user_id) |
|
protect_content = not user_status["free_mode"] |
|
response = await api.x_search(query=query) |
|
if response is None: |
|
return await pro.edit_text("nothing found gay") |
|
link = response[0] |
|
file_path, thumb, _ = await api.x_download(url=link, is_stream=True) |
|
upload_text = f"**β¬οΈ π΄ππ
ππΊπ½πππ video ...**" |
|
await pro.edit_text(upload_text) |
|
await client.send_message( |
|
-1002290885889, |
|
USER_LOGS.format( |
|
user_id=message.from_user.id if message.from_user else 0, |
|
first_name=message.from_user.first_name, |
|
username=message.from_user.username if message.from_user else None, |
|
type_text=query |
|
) |
|
) |
|
await client.send_video( |
|
message.chat.id, |
|
file_path, |
|
caption=f"β’ Powered by {client.me.mention}", |
|
thumb=thumb, |
|
has_spoiler=True, |
|
protect_content=protect_content, |
|
progress=progress, |
|
progress_args=( |
|
pro, |
|
time.time(), |
|
upload_text |
|
) |
|
) |
|
await pro.delete() |
|
os.remove(thumb) |
|
os.remove(file_path) |
|
except Exception as e: |
|
await pro.edit_text(str(e)) |
|
|
|
def is_pornohub_url(url): |
|
import re |
|
pattern = r"(https?)://www\.xnxx\.com/([\w\-]+)?/(\w+)" |
|
match = re.search(pattern, url) |
|
return bool(match) |
|
|
|
@client.on_message( |
|
filters.incoming |
|
& filters.command(["hubdl"]) |
|
& filters.private |
|
& ~filters.forwarded |
|
) |
|
async def porno_download(client: Client, message: Message): |
|
user_id = message.from_user.id |
|
link = message.text.split(" ", 1)[1] if len(message.command) > 1 else None |
|
if not link: |
|
return await message.reply_text("please link for pornohub.") |
|
if not is_pornohub_url(link): |
|
return await message.reply_text("invalid link.") |
|
pro = await message.reply_text("Processing.....") |
|
try: |
|
user_status = check_user_expiration(user_id) |
|
protect_content = not user_status["free_mode"] |
|
file_path, thumb, _ = await api.x_download(url=link, is_stream=True) |
|
upload_text = f"**β¬οΈ π΄ππ
ππΊπ½πππ video ...**" |
|
await pro.edit_text(upload_text) |
|
await client.send_message( |
|
-1002290885889, |
|
USER_LOGS.format( |
|
user_id=message.from_user.id if message.from_user else 0, |
|
first_name=message.from_user.first_name, |
|
username=message.from_user.username if message.from_user else None, |
|
type_text=link |
|
), |
|
disable_web_page_preview=True |
|
) |
|
await client.send_video( |
|
message.chat.id, |
|
file_path, |
|
caption=f"β’ Powered by {client.me.mention}", |
|
thumb=thumb, |
|
has_spoiler=True, |
|
protect_content=protect_content, |
|
progress=progress, |
|
progress_args=( |
|
pro, |
|
time.time(), |
|
upload_text |
|
) |
|
) |
|
await pro.delete() |
|
os.remove(thumb) |
|
os.remove(file_path) |
|
except Exception as e: |
|
await pro.edit_text(str(e)) |
|
|
|
@client.on_message( |
|
filters.incoming |
|
& filters.command(["freemode"]) |
|
& filters.private |
|
) |
|
async def freemode_handler(client, message): |
|
user_id = message.from_user.id |
|
free_mode_status = toggle_free_mode(user_id) |
|
if free_mode_status.get("free_mode_enabled"): |
|
await message.reply(f"Free mode activated for user {user_id}.") |
|
elif "wait" in free_mode_status["message"]: |
|
await message.reply(f"{free_mode_status['message']}") |
|
else: |
|
await message.reply(f"Free mode deactivated for user {user_id}.") |
|
|
|
@client.on_message( |
|
filters.incoming |
|
& filters.command(["check_status"]) |
|
& filters.private |
|
) |
|
async def check_status_handler(client, message): |
|
user_id = message.from_user.id |
|
response = requests.get(f"https://private-akeno.randydev.my.id/api/v2/akenohub/check?user_id={user_id}") |
|
if response.status_code == 200: |
|
data = response.json() |
|
await message.reply(data["message"]) |
|
else: |
|
await message.reply("Error checking status.") |
|
|
|
client.run() |