taslim19
Fix quotly.py: remove await on bytes object in all cases
28baa10
raw
history blame
10.6 kB
from io import BytesIO
from pyrogram import filters
from pyrogram.types import Message
from DragMusic import app
import httpx
# -----------------------------------------------------------------
fetch = httpx.AsyncClient(
http2=True,
verify=False,
headers={
"Accept-Language": "id-ID",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edge/107.0.1418.42",
},
timeout=httpx.Timeout(20),
)
# ------------------------------------------------------------------------
class QuotlyException(Exception):
pass
# --------------------------------------------------------------------------
async def get_message_sender_id(ctx: Message):
if ctx.forward_date:
if ctx.forward_sender_name:
return 1
elif ctx.forward_from:
return ctx.forward_from.id
elif ctx.forward_from_chat:
return ctx.forward_from_chat.id
else:
return 1
elif ctx.from_user:
return ctx.from_user.id
elif ctx.sender_chat:
return ctx.sender_chat.id
else:
return 1
# -----------------------------------------------------------------------------------------
async def get_message_sender_name(ctx: Message):
if ctx.forward_date:
if ctx.forward_sender_name:
return ctx.forward_sender_name
elif ctx.forward_from:
return (
f"{ctx.forward_from.first_name} {ctx.forward_from.last_name}"
if ctx.forward_from.last_name
else ctx.forward_from.first_name
)
elif ctx.forward_from_chat:
return ctx.forward_from_chat.title
else:
return ""
elif ctx.from_user:
if ctx.from_user.last_name:
return f"{ctx.from_user.first_name} {ctx.from_user.last_name}"
else:
return ctx.from_user.first_name
elif ctx.sender_chat:
return ctx.sender_chat.title
else:
return ""
# ---------------------------------------------------------------------------------------------------
async def get_message_sender_username(ctx: Message):
if ctx.forward_date:
if (
not ctx.forward_sender_name
and not ctx.forward_from
and ctx.forward_from_chat
and ctx.forward_from_chat.username
):
return ctx.forward_from_chat.username
elif (
not ctx.forward_sender_name
and not ctx.forward_from
and ctx.forward_from_chat
or ctx.forward_sender_name
or not ctx.forward_from
):
return ""
else:
return ctx.forward_from.username or ""
elif ctx.from_user and ctx.from_user.username:
return ctx.from_user.username
elif (
ctx.from_user
or ctx.sender_chat
and not ctx.sender_chat.username
or not ctx.sender_chat
):
return ""
else:
return ctx.sender_chat.username
# ------------------------------------------------------------------------
async def get_message_sender_photo(ctx: Message):
if ctx.forward_date:
if (
not ctx.forward_sender_name
and not ctx.forward_from
and ctx.forward_from_chat
and ctx.forward_from_chat.photo
):
return {
"small_file_id": ctx.forward_from_chat.photo.small_file_id,
"small_photo_unique_id": ctx.forward_from_chat.photo.small_photo_unique_id,
"big_file_id": ctx.forward_from_chat.photo.big_file_id,
"big_photo_unique_id": ctx.forward_from_chat.photo.big_photo_unique_id,
}
elif (
not ctx.forward_sender_name
and not ctx.forward_from
and ctx.forward_from_chat
or ctx.forward_sender_name
or not ctx.forward_from
):
return ""
else:
return (
{
"small_file_id": ctx.forward_from.photo.small_file_id,
"small_photo_unique_id": ctx.forward_from.photo.small_photo_unique_id,
"big_file_id": ctx.forward_from.photo.big_file_id,
"big_photo_unique_id": ctx.forward_from.photo.big_photo_unique_id,
}
if ctx.forward_from.photo
else ""
)
elif ctx.from_user and ctx.from_user.photo:
return {
"small_file_id": ctx.from_user.photo.small_file_id,
"small_photo_unique_id": ctx.from_user.photo.small_photo_unique_id,
"big_file_id": ctx.from_user.photo.big_file_id,
"big_photo_unique_id": ctx.from_user.photo.big_photo_unique_id,
}
elif (
ctx.from_user
or ctx.sender_chat
and not ctx.sender_chat.photo
or not ctx.sender_chat
):
return ""
else:
return {
"small_file_id": ctx.sender_chat.photo.small_file_id,
"small_photo_unique_id": ctx.sender_chat.photo.small_photo_unique_id,
"big_file_id": ctx.sender_chat.photo.big_file_id,
"big_photo_unique_id": ctx.sender_chat.photo.big_photo_unique_id,
}
# ---------------------------------------------------------------------------------------------------
async def get_text_or_caption(ctx: Message):
if ctx.text:
return ctx.text
elif ctx.caption:
return ctx.caption
else:
return ""
# ---------------------------------------------------------------------------------------------------
async def pyrogram_to_quotly(messages, is_reply):
if not isinstance(messages, list):
messages = [messages]
payload = {
"type": "quote",
"format": "png",
"backgroundColor": "#1b1429",
"messages": [],
}
for message in messages:
the_message_dict_to_append = {}
if message.entities:
the_message_dict_to_append["entities"] = [
{
"type": entity.type.name.lower(),
"offset": entity.offset,
"length": entity.length,
}
for entity in message.entities
]
elif message.caption_entities:
the_message_dict_to_append["entities"] = [
{
"type": entity.type.name.lower(),
"offset": entity.offset,
"length": entity.length,
}
for entity in message.caption_entities
]
else:
the_message_dict_to_append["entities"] = []
the_message_dict_to_append["chatId"] = await get_message_sender_id(message)
the_message_dict_to_append["text"] = await get_text_or_caption(message)
the_message_dict_to_append["avatar"] = True
the_message_dict_to_append["from"] = {}
the_message_dict_to_append["from"]["id"] = await get_message_sender_id(message)
the_message_dict_to_append["from"]["name"] = await get_message_sender_name(message)
the_message_dict_to_append["from"]["username"] = await get_message_sender_username(message)
the_message_dict_to_append["from"]["type"] = message.chat.type.name.lower()
the_message_dict_to_append["from"]["photo"] = await get_message_sender_photo(message)
if message.reply_to_message and is_reply:
the_message_dict_to_append["replyMessage"] = {
"name": await get_message_sender_name(message.reply_to_message),
"text": await get_text_or_caption(message.reply_to_message),
"chatId": await get_message_sender_id(message.reply_to_message),
}
else:
the_message_dict_to_append["replyMessage"] = {}
payload["messages"].append(the_message_dict_to_append)
r = await fetch.post("https://bot.lyo.su/quote/generate.png", json=payload)
if not r.is_error:
return r.read()
else:
raise QuotlyException(await r.aread())
# ------------------------------------------------------------------------------------------
def isArgInt(txt) -> list:
count = txt
try:
count = int(count)
return [True, count]
except ValueError:
return [False, 0]
# ---------------------------------------------------------------------------------------------------
@app.on_message(filters.command(["q"]) & filters.reply)
async def msg_quotly_cmd(client, ctx: Message):
args = ctx.text.split()
is_reply = False
quote_range = 1
# /q r or /q <number>
if len(args) > 1:
if args[1].lower() == "r":
is_reply = True
else:
try:
quote_range = int(args[1])
if quote_range < 2 or quote_range > 10:
return await ctx.reply("Invalid range (2-10)")
except ValueError:
return await ctx.reply("Invalid argument. Use /q r or /q <number> (2-10)")
if quote_range > 1:
try:
messages = [
i
for i in await client.get_messages(
chat_id=ctx.chat.id,
message_ids=range(
ctx.reply_to_message.id,
ctx.reply_to_message.id + quote_range
),
replies=-1,
)
if not i.empty and not i.media
]
except Exception:
return await ctx.reply_text("🤷🏻‍♂️")
try:
make_quotly = await pyrogram_to_quotly(messages, is_reply=is_reply)
bio_sticker = BytesIO(make_quotly)
bio_sticker.name = "misskatyquote_sticker.webp"
return await ctx.reply_sticker(bio_sticker)
except Exception:
return await ctx.reply("🤷🏻‍♂️")
else:
try:
messages_one = await client.get_messages(
chat_id=ctx.chat.id, message_ids=ctx.reply_to_message.id, replies=-1
)
messages = [messages_one]
except Exception:
return await ctx.reply("🤷🏻‍♂️")
try:
make_quotly = await pyrogram_to_quotly(messages, is_reply=is_reply)
bio_sticker = BytesIO(make_quotly)
bio_sticker.name = "misskatyquote_sticker.webp"
return await ctx.reply_sticker(bio_sticker)
except Exception as e:
return await ctx.reply(f"ERROR: {e}")