File size: 2,118 Bytes
054900e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Callable
from aiogram import BaseMiddleware, Bot
from aiogram.enums import ChatMemberStatus
from aiogram.exceptions import TelegramNotFound
from aiogram.methods import GetChatMember
if TYPE_CHECKING:
from collections.abc import Awaitable
from aiogram.types import TelegramObject, User
class ChannelSubscribeMiddleware(BaseMiddleware):
"""The middleware is only guaranteed to work for other users if the bot is an administrator in the chat."""
def __init__(self, chat_ids: list[int | str] | int | str) -> None:
self.chat_ids = chat_ids
super().__init__()
async def __call__(
self,
handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: dict[str, Any],
) -> Any:
user: User | None = getattr(event, "from_user", None)
if not user:
return await handler(event, data)
user_id = user.id
bot: Bot = data["bot"]
if await self._is_subscribed(bot=bot, user_id=user_id):
return await handler(event, data)
return None
# await message.answer(_("first subscribe to this channel(s)/group(s)"), reply_markup=)
async def _is_subscribed(self, bot: Bot, user_id: int) -> bool:
if isinstance(self.chat_ids, list):
for chat_id in self.chat_ids:
try:
member = await bot(GetChatMember(chat_id=chat_id, user_id=user_id))
except TelegramNotFound:
return False
if member.status in {ChatMemberStatus.LEFT, ChatMemberStatus.KICKED, ChatMemberStatus.RESTRICTED}:
return False
elif isinstance(self.chat_ids, (str, int)):
try:
member = await bot(GetChatMember(chat_id=self.chat_ids, user_id=user_id))
except TelegramNotFound:
return False
if member.status in {ChatMemberStatus.LEFT, ChatMemberStatus.KICKED}:
return False
return True
|