import os import requests import zeep from datetime import datetime, timedelta import telegram import time # Получение токенов HF_TOKEN = os.getenv("HF_TOKEN") TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") CHAT_ID = "-4620812399" # Ваш чат Telegram # Клиент SOAP для получения RUONIA wsdl_url = 'https://cbr.ru/DailyInfoWebServ/DailyInfo.asmx?WSDL' client = zeep.Client(wsdl=wsdl_url) def get_ruonia(start_date, end_date): start_str = start_date.strftime('%Y-%m-%d') end_str = end_date.strftime('%Y-%m-%d') try: data = client.service.Ruonia(fromDate=start_str, ToDate=end_str) return data except Exception as e: return [] def summarize_ruonia(data): summary = [] total_rate = 0 total_volume = 0 for entry in data: date = entry['D0'].strftime('%d.%m.%Y') rate = float(entry['RUONIA']) volume = float(entry['Volume']) total_rate += rate total_volume += volume summary.append(f"📅 {date}: Ставка RUONIA: {rate:.4f}%, Объём: {volume:.2f} млн руб.") avg_rate = total_rate / len(data) if data else 0 return "\n".join(summary), avg_rate, total_volume def get_weather(): url = "https://wttr.in/Moscow?format=3" try: res = requests.get(url) if res.status_code == 200: return res.text else: return "Погода в Москве: не удалось получить данные" except Exception: return "Погода в Москве: ошибка подключения" def ask_hf(prompt): headers = { "Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json" } payload = {"inputs": prompt} url = "https://api-inference.huggingface.co/models/HuggingFaceH4/zephyr-7b-beta" try: res = requests.post(url, headers=headers, json=payload, timeout=60) output = res.json() if isinstance(output, list) and "generated_text" in output[0]: return output[0]["generated_text"] else: return "⚠️ Не удалось получить ответ нейросети" except Exception: return "⚠️ Ошибка запроса к HF API" def send_telegram_message(text): bot = telegram.Bot(token=TELEGRAM_TOKEN) bot.send_message(chat_id=CHAT_ID, text=text, parse_mode=telegram.ParseMode.HTML) def generate_daily_report(): today = datetime.today() start = today.replace(day=1) end = today ruonia_data = get_ruonia(start, end) ruonia_summary, avg_rate, total_vol = summarize_ruonia(ruonia_data) weather = get_weather() prompt = ( f"Ты — эксперт по экономике и геополитике. Вот данные:\n\n" f"{ruonia_summary}\n\n" f"Погода в Москве: {weather}\n\n" f"Сделай краткий вывод на русском: как это связано с ситуацией в стране и мире, кратко, в 3-4 предложениях." ) ai_comment = ask_hf(prompt) message = ( "📰 Экономическая сводка за {}\n\n" "{}\n\n" "📊 Средняя ставка RUONIA: {:.2f}%\n" "💰 Общий объём: {:.2f} млн руб.\n" "🌦️ {}\n\n" "🤖 Мнение нейросети:\n{}\n" ).format(end.strftime('%d.%m.%Y'), ruonia_summary, avg_rate, total_vol, weather, ai_comment) send_telegram_message(message) # Запускаем отправку отчёта if __name__ == "__main__": generate_daily_report()