RUTGS / app.py
Dmtlant's picture
Create app.py
f1bfa1a verified
import os
import requests
import zeep
from datetime import datetime, timedelta
import telegram
import time
# Получение токенов
HF_TOKEN = os.getenv("HF_TOKEN")
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
CHAT_ID = "-4620812399" # Ваш чат Telegram
# Клиент SOAP для получения RUONIA
wsdl_url = 'https://cbr.ru/DailyInfoWebServ/DailyInfo.asmx?WSDL'
client = zeep.Client(wsdl=wsdl_url)
def get_ruonia(start_date, end_date):
start_str = start_date.strftime('%Y-%m-%d')
end_str = end_date.strftime('%Y-%m-%d')
try:
data = client.service.Ruonia(fromDate=start_str, ToDate=end_str)
return data
except Exception as e:
return []
def summarize_ruonia(data):
summary = []
total_rate = 0
total_volume = 0
for entry in data:
date = entry['D0'].strftime('%d.%m.%Y')
rate = float(entry['RUONIA'])
volume = float(entry['Volume'])
total_rate += rate
total_volume += volume
summary.append(f"📅 {date}: Ставка RUONIA: {rate:.4f}%, Объём: {volume:.2f} млн руб.")
avg_rate = total_rate / len(data) if data else 0
return "\n".join(summary), avg_rate, total_volume
def get_weather():
url = "https://wttr.in/Moscow?format=3"
try:
res = requests.get(url)
if res.status_code == 200:
return res.text
else:
return "Погода в Москве: не удалось получить данные"
except Exception:
return "Погода в Москве: ошибка подключения"
def ask_hf(prompt):
headers = {
"Authorization": f"Bearer {HF_TOKEN}",
"Content-Type": "application/json"
}
payload = {"inputs": prompt}
url = "https://api-inference.huggingface.co/models/HuggingFaceH4/zephyr-7b-beta"
try:
res = requests.post(url, headers=headers, json=payload, timeout=60)
output = res.json()
if isinstance(output, list) and "generated_text" in output[0]:
return output[0]["generated_text"]
else:
return "⚠️ Не удалось получить ответ нейросети"
except Exception:
return "⚠️ Ошибка запроса к HF API"
def send_telegram_message(text):
bot = telegram.Bot(token=TELEGRAM_TOKEN)
bot.send_message(chat_id=CHAT_ID, text=text, parse_mode=telegram.ParseMode.HTML)
def generate_daily_report():
today = datetime.today()
start = today.replace(day=1)
end = today
ruonia_data = get_ruonia(start, end)
ruonia_summary, avg_rate, total_vol = summarize_ruonia(ruonia_data)
weather = get_weather()
prompt = (
f"Ты — эксперт по экономике и геополитике. Вот данные:\n\n"
f"{ruonia_summary}\n\n"
f"Погода в Москве: {weather}\n\n"
f"Сделай краткий вывод на русском: как это связано с ситуацией в стране и мире, кратко, в 3-4 предложениях."
)
ai_comment = ask_hf(prompt)
message = (
"<b>📰 Экономическая сводка за {}</b>\n\n"
"{}\n\n"
"📊 Средняя ставка RUONIA: {:.2f}%\n"
"💰 Общий объём: {:.2f} млн руб.\n"
"🌦️ {}\n\n"
"🤖 Мнение нейросети:\n{}\n"
).format(end.strftime('%d.%m.%Y'), ruonia_summary, avg_rate, total_vol, weather, ai_comment)
send_telegram_message(message)
# Запускаем отправку отчёта
if __name__ == "__main__":
generate_daily_report()