File size: 2,118 Bytes
054900e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Callable

from aiogram import BaseMiddleware, Bot
from aiogram.enums import ChatMemberStatus
from aiogram.exceptions import TelegramNotFound
from aiogram.methods import GetChatMember

if TYPE_CHECKING:
    from collections.abc import Awaitable

    from aiogram.types import TelegramObject, User


class ChannelSubscribeMiddleware(BaseMiddleware):
    """The middleware is only guaranteed to work for other users if the bot is an administrator in the chat."""

    def __init__(self, chat_ids: list[int | str] | int | str) -> None:
        self.chat_ids = chat_ids
        super().__init__()

    async def __call__(
        self,
        handler: Callable[[TelegramObject, dict[str, Any]], Awaitable[Any]],
        event: TelegramObject,
        data: dict[str, Any],
    ) -> Any:
        user: User | None = getattr(event, "from_user", None)
        if not user:
            return await handler(event, data)

        user_id = user.id
        bot: Bot = data["bot"]

        if await self._is_subscribed(bot=bot, user_id=user_id):
            return await handler(event, data)
        return None

        # await message.answer(_("first subscribe to this channel(s)/group(s)"), reply_markup=)

    async def _is_subscribed(self, bot: Bot, user_id: int) -> bool:
        if isinstance(self.chat_ids, list):
            for chat_id in self.chat_ids:
                try:
                    member = await bot(GetChatMember(chat_id=chat_id, user_id=user_id))
                except TelegramNotFound:
                    return False

                if member.status in {ChatMemberStatus.LEFT, ChatMemberStatus.KICKED, ChatMemberStatus.RESTRICTED}:
                    return False

        elif isinstance(self.chat_ids, (str, int)):
            try:
                member = await bot(GetChatMember(chat_id=self.chat_ids, user_id=user_id))
            except TelegramNotFound:
                return False

            if member.status in {ChatMemberStatus.LEFT, ChatMemberStatus.KICKED}:
                return False

        return True