Spaces:
Sleeping
Sleeping
| import asyncio | |
| import zipfile | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Collection | |
| import pandas as pd | |
| import gradio as gr | |
| from telethon import TelegramClient, types, errors | |
| from utils.auth import AuthState, ClientConnector | |
| from utils.validation import Validator | |
| MESSAGE_DICT = dict[str, str | int | datetime | None] | |
| DEFAULT_PARSE_KWARGS = dict( | |
| limit=None, | |
| offset_date=None, | |
| reverse=False, | |
| ) | |
| class Chat: | |
| chat: types.TLObject | |
| chat_name: str | None | |
| chat_username: str | |
| chat_type: str | |
| chat_id: int | |
| def from_telethon_chat(cls, chat: types.TLObject, chat_username: str): | |
| chat_id = chat.id | |
| if isinstance(chat, types.User): | |
| chat_type = 'Chat' | |
| chat_name = f'{chat.first_name} {chat.last_name}' | |
| else: | |
| chat_type = 'Channel/Group' | |
| chat_name = chat.title | |
| return cls(chat, chat_name, chat_username, chat_type, chat_id) | |
| def get_chat_info(self) -> str: | |
| chat_info = f'Chat name: {self.chat_name}, Chat type: {self.chat_type}, Chat ID: {self.chat_id}' | |
| return chat_info | |
| class Parser: | |
| parse_results_dir = Path('parse_results_dir') | |
| def message_to_dict(message: types.Message) -> MESSAGE_DICT: | |
| text = message.text if message.text else message.message | |
| if not text: | |
| return None | |
| date = message.date | |
| sender = message.sender | |
| sender_type = type(sender).__name__ | |
| chat = message._chat | |
| chat_id = chat.id | |
| chat_type = type(chat).__name__ | |
| chat_name = chat.title if isinstance(chat, types.Channel) else f'{chat.first_name} {chat.last_name}' | |
| if isinstance(sender, types.User): | |
| sender_id = message.sender.id | |
| username = sender.username | |
| first_name = sender.first_name | |
| last_name = sender.last_name | |
| else: | |
| sender_id = message._sender_id | |
| username = getattr(message.sender, 'username', None) | |
| first_name = None | |
| last_name = None | |
| message_dict = { | |
| 'date': date, | |
| 'chat_type': chat_type, | |
| 'chat_name': chat_name, | |
| 'chat_id': chat_id, | |
| 'sender_type': sender_type, | |
| 'sender_username': username, | |
| 'sender_first_name': first_name, | |
| 'sender_last_name': last_name, | |
| 'sender_id': sender_id, | |
| 'text': text, | |
| } | |
| return message_dict | |
| async def get_messages_from_chat( | |
| cls, | |
| client: TelegramClient, | |
| chat: types.TLObject, | |
| parse_chats_pb_info: str, | |
| **parse_kwargs, | |
| ) -> list[MESSAGE_DICT]: | |
| async with client: | |
| progress = gr.Progress() | |
| messages = client.iter_messages(entity=chat, **parse_kwargs) | |
| message_dicts = [] | |
| message_count = 0 | |
| async for message in messages: | |
| message_count += 1 | |
| if message_count % 1000 == 0: | |
| await asyncio.sleep(1) | |
| message_dict = cls.message_to_dict(message) | |
| if message_dict is not None: | |
| message_dicts.append(message_dict) | |
| if message_count % 1000 == 0: | |
| await asyncio.sleep(1) | |
| if parse_kwargs['limit'] is not None: | |
| total = parse_kwargs['limit'] | |
| progress(message_count / total, desc=f'{parse_chats_pb_info}, Parsing messages {message_count}/{total}') | |
| else: | |
| progress(message_count, desc=f'{parse_chats_pb_info}, Parsing messages {message_count}/?') | |
| if not parse_kwargs['reverse']: | |
| message_dicts = message_dicts[::-1] | |
| return message_dicts | |
| async def parse_chats( | |
| cls, | |
| auth_state: AuthState, | |
| chats_list: list[Chat], | |
| api_id: str, | |
| api_hash: str, | |
| *parse_args, | |
| ) -> tuple[str, list[Path]]: | |
| cvs_paths = [] | |
| parse_result = '' | |
| if len(chats_list) == 0: | |
| return 'Список чатов для парсинга пустой', cvs_paths | |
| client = ClientConnector.get_client(auth_state.get_session(), api_id, api_hash) | |
| validation_result = await Validator.validate_auth(client) | |
| if not validation_result.is_valid: | |
| return 'Клиент не авторизован', cvs_paths | |
| parse_kwargs = dict(zip(DEFAULT_PARSE_KWARGS.keys(), parse_args)) | |
| progress = gr.Progress() | |
| for i, chat in enumerate(chats_list, start=1): | |
| try: | |
| parse_chats_pb_info = f'Parsing chats {i}/{len(chats_list)}' | |
| message_dicts = await cls.get_messages_from_chat(client, chat.chat, parse_chats_pb_info, **parse_kwargs) | |
| if len(message_dicts) == 0: | |
| log_msg = f'Из чата {chat.chat_username} не было извлечено ни одного сообщения' | |
| parse_result += log_msg + '\n' | |
| else: | |
| cvs_path = cls.messages_to_csv(message_dicts) | |
| cvs_paths.append(cvs_path) | |
| log_msg = f'Успешный парсинг чата {chat.chat_username}, кол-во сообщений: {len(message_dicts)}' | |
| parse_result += log_msg + '\n' | |
| except Exception as ex: | |
| log_msg = f'Ошибка при парсинге чата {chat.chat_username}, код ошибки: {ex}' | |
| parse_result += log_msg + '\n' | |
| progress(i / len(chats_list), desc=parse_chats_pb_info) | |
| return parse_result, cvs_paths | |
| def messages_to_csv(cls, message_dicts: Collection[MESSAGE_DICT]) -> Path: | |
| df = pd.DataFrame.from_dict(message_dicts) | |
| chat_name = message_dicts[0].get('chat_name', '') | |
| cvs_path = cls.parse_results_dir / f'telegram_history_{chat_name}.csv' | |
| df.to_csv(cvs_path, index=False) | |
| return cvs_path | |
| def zip_files(cls, file_paths: Collection[Path]) -> Path: | |
| zip_filepath = cls.parse_results_dir / 'parse_results_csv.zip' | |
| with zipfile.ZipFile(zip_filepath, 'w') as zipf: | |
| for file_path in file_paths: | |
| zipf.write(file_path, arcname=file_path) | |
| return zip_filepath | |
| def get_chats_info(chats_list: list[Chat]) -> str: | |
| chats_info = '' | |
| for i, chat in enumerate(chats_list, start=1): | |
| chats_info += f'{i}: ' + chat.get_chat_info() + '\n' | |
| return chats_info | |
| async def get_chat(client: TelegramClient, chat_username: str) -> types.TLObject: | |
| try: | |
| if client.is_connected(): | |
| chat = await client.get_entity(chat_username) | |
| else: | |
| async with client: | |
| chat = await client.get_entity(chat_username) | |
| except (errors.UsernameNotOccupiedError, errors.UsernameInvalidError) as ex: | |
| log_msg = f'Чат или канал {chat_username} не найден или введен неверно' | |
| raise errors.UsernameInvalidError(log_msg) | |
| except Exception as ex: | |
| log_msg = f'Ошибка при получении объекта чата, код ошибки: {ex}' | |
| raise Exception(log_msg) | |
| return chat | |
| async def add_chat_to_chats_list( | |
| cls, | |
| auth_state: AuthState, | |
| chats_usernames, | |
| chats_list: list[Chat], | |
| api_id: str, | |
| api_hash: str, | |
| ) -> str: | |
| if chats_usernames.strip() == '': | |
| return 'Не заданы адрес/адреса чатов для добавления' | |
| client = ClientConnector.get_client(auth_state.get_session(), api_id, api_hash) | |
| validation_result = await Validator.validate_auth(client) | |
| if not validation_result.is_valid: | |
| return 'Клиент не авторизован' | |
| for chat_username in chats_usernames.split(): | |
| try: | |
| telethon_chat = await cls.get_chat(client, chat_username.strip()) | |
| if not telethon_chat in chats_list: | |
| chat = Chat.from_telethon_chat(telethon_chat, chat_username) | |
| chats_list.append(chat) | |
| else: | |
| log_msg = f'Чат {chat_username} уже есть в списке' | |
| gr.Info(log_msg) | |
| except Exception as ex: | |
| log_msg = str(ex) | |
| gr.Info(log_msg) | |
| return cls.get_chats_info(chats_list) | |
| Parser.parse_results_dir.mkdir(exist_ok=True) | |