Spaces:
No application file
No application file
from flask import Flask, request, jsonify, render_template, flash, redirect, url_for, current_app | |
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user | |
from werkzeug.security import generate_password_hash, check_password_hash | |
from transformers import pipeline | |
from flask import session | |
import torch | |
from pydub import AudioSegment | |
import os | |
import io | |
import uuid | |
from datetime import datetime | |
import sqlite3 | |
from pathlib import Path | |
import whisper | |
from extensions import db, login_manager | |
import json | |
from admin import admin_bp | |
from flask_migrate import Migrate | |
from models import User | |
from werkzeug.utils import secure_filename | |
from forms import EditProfileForm | |
instance_path = Path(__file__).parent / 'instance' | |
instance_path.mkdir(exist_ok=True, mode=0o755) | |
app = Flask(__name__) | |
app.secret_key = 'очень_сложный_секретный_ключ_здесь' | |
db_path = instance_path / 'chats.db' | |
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}' | |
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |
# Инициализация Flask-Login | |
db.init_app(app) | |
login_manager.init_app(app) | |
login_manager.login_view = 'welcome' | |
migrate = Migrate(app, db) | |
# Инициализация моделей | |
def init_models(): | |
try: | |
emotion_map = { | |
'joy': '😊 Радость', | |
'neutral': '😐 Нейтрально', | |
'anger': '😠 Злость', | |
'sadness': '😢 Грусть', | |
'surprise': '😲 Удивление' | |
} | |
speech_to_text_model = whisper.load_model("base") | |
text_classifier = pipeline( | |
"text-classification", | |
model="cointegrated/rubert-tiny2-cedr-emotion-detection" | |
) | |
audio_classifier = pipeline( | |
"audio-classification", | |
model="superb/hubert-large-superb-er" | |
) | |
return { | |
'emotion_map': emotion_map, | |
'speech_to_text_model': speech_to_text_model, | |
'text_classifier': text_classifier, | |
'audio_classifier': audio_classifier | |
} | |
except Exception as e: | |
print(f"Ошибка загрузки моделей: {e}") | |
return None | |
models = init_models() | |
if not models: | |
raise RuntimeError("Не удалось загрузить модели") | |
def datetimeformat(value, format='%d.%m.%Y %H:%M'): | |
if value is None: | |
return "" | |
return value.strftime(format) | |
def utility_processor(): | |
return { | |
'emotion_map': { | |
'joy': '😊 Радость', | |
'neutral': '😐 Нейтрально', | |
'anger': '😠 Злость', | |
'sadness': '😢 Грусть', | |
'surprise': '😲 Удивление' | |
}, | |
'get_emotion_color': lambda emotion: { | |
'joy': '#00b894', | |
'neutral': '#636e72', | |
'anger': '#d63031', | |
'sadness': '#0984e3', | |
'surprise': '#fdcb6e' | |
}.get(emotion, '#4a4ae8') | |
} | |
# Импорт Blueprint | |
from auth import auth_bp | |
from profile import profile_bp | |
app.register_blueprint(auth_bp) | |
app.register_blueprint(profile_bp) | |
app.register_blueprint(admin_bp, url_prefix='/admin') | |
# Делаем переменные доступными | |
emotion_map = models['emotion_map'] | |
speech_to_text_model = models['speech_to_text_model'] | |
text_classifier = models['text_classifier'] | |
audio_classifier = models['audio_classifier'] | |
def create_admin(): | |
"""Создание администратора""" | |
email = input("Введите email: ") | |
password = input("Введите пароль: ") | |
user = User.query.filter_by(email=email).first() | |
if user: | |
user.is_admin = True | |
user.set_password(password) | |
else: | |
user = User(email=email, username=email, is_admin=True) | |
user.set_password(password) | |
db.session.add(user) | |
db.session.commit() | |
print(f"Администратор {email} создан") | |
def transcribe_audio(audio_path): | |
"""Преобразование аудио в текст с помощью Whisper""" | |
if not speech_to_text_model: | |
return None | |
try: | |
result = speech_to_text_model.transcribe(audio_path, language="ru") | |
return result["text"] | |
except Exception as e: | |
print(f"Ошибка преобразования аудио в текст: {e}") | |
return None | |
# Инициализация Flask-Login | |
login_manager = LoginManager(app) | |
login_manager.login_view = 'auth_bp.login' | |
login_manager.login_message = "Для доступа к этой странице необходимо авторизоваться" | |
login_manager.login_message_category = "info" | |
def load_user(user_id): | |
from models import User | |
return User.query.get(int(user_id)) | |
# Инициализация БД | |
def get_db_connection(): | |
instance_path = Path('instance') | |
instance_path.mkdir(exist_ok=True) | |
db_path = instance_path / 'chats.db' | |
conn = sqlite3.connect(str(db_path)) | |
conn.row_factory = sqlite3.Row | |
return conn | |
def init_db(): | |
conn = get_db_connection() | |
try: | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
username TEXT UNIQUE NOT NULL, | |
email TEXT UNIQUE NOT NULL, | |
password_hash TEXT NOT NULL, | |
created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS chats ( | |
chat_id TEXT PRIMARY KEY, | |
user_id INTEGER, | |
created_at TEXT, | |
title TEXT, | |
FOREIGN KEY(user_id) REFERENCES users(id) | |
) | |
''') | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS messages ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
chat_id TEXT, | |
sender TEXT, | |
content TEXT, | |
timestamp TEXT, | |
FOREIGN KEY(chat_id) REFERENCES chats(chat_id) | |
) | |
''') | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS analysis_reports ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id INTEGER, | |
content TEXT, | |
emotion TEXT, | |
confidence REAL, | |
created_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY(user_id) REFERENCES users(id) | |
) | |
''') | |
conn.commit() | |
finally: | |
conn.close() | |
init_db() | |
# Маршруты аутентификации | |
def login(): | |
if request.method == 'POST': | |
email = request.form.get('email') | |
password = request.form.get('password') | |
conn = get_db_connection() | |
user = conn.execute( | |
"SELECT id, username, email, password_hash FROM users WHERE email = ?", | |
(email,) | |
).fetchone() | |
conn.close() | |
if user and check_password_hash(user['password_hash'], password): | |
user_obj = User(id=user['id'], username=user['username'], | |
email=user['email'], password_hash=user['password_hash']) | |
login_user(user_obj) | |
session.pop('_flashes', None) | |
return redirect(url_for('index')) | |
else: | |
flash('Неверный email или пароль', 'danger') # <-- теперь здесь | |
return render_template('auth/login.html') | |
def register(): | |
if request.method == 'POST': | |
username = request.form.get('username') | |
email = request.form.get('email') | |
password = request.form.get('password') | |
confirm_password = request.form.get('confirm_password') | |
if password != confirm_password: | |
flash('Пароли не совпадают', 'danger') | |
return redirect(url_for('register')) | |
conn = get_db_connection() | |
try: | |
password_hash = generate_password_hash(password) | |
conn.execute( | |
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)", | |
(username, email, password_hash) | |
) | |
conn.commit() | |
flash('Регистрация прошла успешно! Теперь вы можете войти.', 'success') | |
return redirect(url_for('login')) | |
except sqlite3.IntegrityError: | |
flash('Пользователь с таким email или именем уже существует', 'danger') | |
finally: | |
conn.close() | |
return render_template('auth/register.html') | |
from werkzeug.security import check_password_hash, generate_password_hash | |
def edit_profile(): | |
form = EditProfileForm(obj=current_user) | |
if form.validate_on_submit(): | |
# Обновляем имя и почту | |
current_user.username = form.username.data | |
current_user.email = form.email.data | |
# Обновляем аватар | |
if form.avatar.data: | |
filename = secure_filename(form.avatar.data.filename) | |
unique_filename = f"{uuid.uuid4().hex}_{filename}" | |
avatar_path = os.path.join(current_app.root_path, 'static/avatars', unique_filename) | |
form.avatar.data.save(avatar_path) | |
current_user.avatar = unique_filename | |
# Обработка смены пароля | |
if form.current_password.data: | |
# Проверяем текущий пароль | |
if check_password_hash(current_user.password_hash, form.current_password.data): | |
# Меняем пароль на новый | |
current_user.password_hash = generate_password_hash(form.new_password.data) | |
flash('Пароль успешно изменён', 'success') | |
else: | |
flash('Текущий пароль неверный', 'danger') | |
return redirect(url_for('edit_profile')) | |
db.session.commit() | |
flash('Профиль обновлён', 'success') | |
return redirect(url_for('profile')) | |
return render_template('edit_profile.html', form=form) | |
# Основные маршруты | |
def welcome(): | |
return render_template("welcome.html") | |
def logout(): | |
session.clear() | |
logout_user() | |
return redirect(url_for('welcome')) | |
def index(): | |
if current_user.is_authenticated: | |
conn = get_db_connection() | |
try: | |
chats = conn.execute( | |
"SELECT chat_id, title FROM chats WHERE user_id = ? ORDER BY created_at DESC", | |
(current_user.id,) | |
).fetchall() | |
return render_template("index.html", chats=chats) | |
finally: | |
conn.close() | |
return redirect(url_for('welcome')) | |
def profile(): | |
conn = get_db_connection() | |
try: | |
# Запрашиваем все анализы пользователя | |
reports = conn.execute( | |
"SELECT * FROM analysis_reports WHERE user_id = ? ORDER BY created_at DESC", | |
(current_user.id,) | |
).fetchall() | |
# Статистика: общее количество анализов | |
total_reports = len(reports) | |
# Статистика: самая частая эмоция | |
emotion_counts = {} | |
for r in reports: | |
emotion_counts[r['emotion']] = emotion_counts.get(r['emotion'], 0) + 1 | |
most_common_emotion = max(emotion_counts, key=emotion_counts.get) if emotion_counts else None | |
return render_template( | |
"profile.html", | |
reports=reports, | |
total_reports=total_reports, | |
most_common_emotion=most_common_emotion, | |
emotion_map={ | |
'joy': '😊 Радость', | |
'neutral': '😐 Нейтрально', | |
'anger': '😠 Злость', | |
'sadness': '😢 Грусть', | |
'surprise': '😲 Удивление' | |
} | |
) | |
except Exception as e: | |
flash(f"Ошибка загрузки данных: {e}", "danger") | |
return redirect(url_for('index')) | |
finally: | |
conn.close() | |
def analyze_text(): | |
if not text_classifier: | |
return jsonify({"error": "Model not loaded"}), 500 | |
try: | |
data = request.get_json() | |
text = data.get("text", "").strip() | |
if not text: | |
return jsonify({"error": "Empty text"}), 400 | |
# Получаем предсказания модели | |
result = text_classifier(text) | |
# Проверяем структуру ответа | |
if not result or not isinstance(result, list): | |
return jsonify({"error": "Invalid model response"}), 500 | |
# Берем первый результат (самый вероятный) | |
prediction = result[0] if result else {} | |
# Проверяем наличие нужных полей | |
if not all(key in prediction for key in ['label', 'score']): | |
return jsonify({"error": "Invalid prediction format"}), 500 | |
# Сохраняем в базу данных | |
conn = get_db_connection() | |
conn.execute( | |
"INSERT INTO analysis_reports (user_id, content, emotion, confidence) VALUES (?, ?, ?, ?)", | |
(current_user.id, text, prediction['label'], prediction['score']) | |
) | |
conn.commit() | |
conn.close() | |
return jsonify({ | |
"emotion": emotion_map.get(prediction['label'], "❓ Неизвестно"), | |
"confidence": float(prediction['score']) | |
}) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
def analyze_audio(): | |
if not audio_classifier or not speech_to_text_model: | |
return jsonify({"error": "Model not loaded"}), 500 | |
if 'audio' not in request.files: | |
return jsonify({'error': 'No audio file'}), 400 | |
try: | |
audio_file = request.files['audio'] | |
temp_path = "temp_audio.wav" | |
audio = AudioSegment.from_file(io.BytesIO(audio_file.read())) | |
audio = audio.set_frame_rate(16000).set_channels(1) | |
audio.export(temp_path, format="wav", codec="pcm_s16le") | |
transcribed_text = transcribe_audio(temp_path) | |
result = audio_classifier(temp_path) | |
os.remove(temp_path) | |
emotion_mapping = { | |
'hap': 'happy', | |
'sad': 'sad', | |
'neu': 'neutral', | |
'ang': 'angry' | |
} | |
emotions = {emotion_mapping.get(item['label'].lower(), 'neutral'): item['score'] | |
for item in result if item['label'].lower() in emotion_mapping} | |
dominant_emotion = max(emotions.items(), key=lambda x: x[1]) | |
response_map = { | |
'happy': '😊 Радость', | |
'sad': '😢 Грусть', | |
'angry': '😠 Злость', | |
'neutral': '😐 Нейтрально' | |
} | |
conn = get_db_connection() | |
conn.execute( | |
"INSERT INTO analysis_reports (user_id, content, emotion, confidence) VALUES (?, ?, ?, ?)", | |
(current_user.id, transcribed_text, dominant_emotion[0], dominant_emotion[1]) | |
) | |
conn.commit() | |
conn.close() | |
return jsonify({ | |
'emotion': response_map.get(dominant_emotion[0], 'неизвестно'), | |
'confidence': round(dominant_emotion[1], 2), | |
'transcribed_text': transcribed_text if transcribed_text else "Не удалось распознать текст" | |
}) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def analyze_telegram_chat(): | |
if 'file' not in request.files: | |
return jsonify({'error': 'No file uploaded'}), 400 | |
file = request.files['file'] | |
if file.filename.split('.')[-1].lower() != 'json': | |
return jsonify({'error': 'Invalid file format. Only JSON allowed'}), 400 | |
try: | |
data = json.load(file) | |
messages = [] | |
for msg in data.get('messages', []): | |
text = msg.get('text') | |
sender = msg.get('from') or msg.get('sender') or 'Неизвестный пользователь' | |
if isinstance(text, str) and len(text.strip()) > 5: | |
messages.append({ | |
'text': text, | |
'timestamp': msg.get('date', datetime.now().isoformat()), | |
'from': sender # <-- сохраняем имя отправителя | |
}) | |
if not messages: | |
return jsonify({'error': 'No valid text messages found'}), 400 | |
results = [] | |
for msg in messages[:500]: | |
prediction = text_classifier(msg['text'])[0] | |
results.append({ | |
'text': msg['text'], | |
'emotion': prediction['label'], | |
'confidence': prediction['score'], | |
'timestamp': msg['timestamp'], | |
'from': msg['from'] # <-- передаем дальше | |
}) | |
conn = get_db_connection() | |
conn.execute(''' | |
CREATE TABLE IF NOT EXISTS telegram_analysis ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id INTEGER, | |
data TEXT, | |
created_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY(user_id) REFERENCES users(id) | |
) | |
''') | |
conn.execute( | |
"INSERT INTO telegram_analysis (user_id, data) VALUES (?, ?)", | |
(current_user.id, json.dumps(results)) | |
) | |
conn.commit() | |
return jsonify({ | |
'status': 'success', | |
'message_count': len(results), | |
}) | |
except Exception as e: | |
print(f"Error during analysis: {e}") | |
return jsonify({'error': f'Server error: {str(e)}'}), 500 | |
finally: | |
conn.close() | |
def get_chats(): | |
conn = get_db_connection() | |
try: | |
chats = conn.execute( | |
"SELECT chat_id, title, created_at FROM chats WHERE user_id = ? ORDER BY created_at DESC", | |
(current_user.id,) | |
).fetchall() | |
return jsonify([dict(chat) for chat in chats]) | |
finally: | |
conn.close() | |
def start_chat(): | |
try: | |
chat_id = str(uuid.uuid4()) | |
title = f"Чат от {datetime.now().strftime('%d.%m.%Y %H:%M')}" | |
conn = get_db_connection() | |
conn.execute( | |
"INSERT INTO chats (chat_id, user_id, created_at, title) VALUES (?, ?, ?, ?)", | |
(chat_id, current_user.id, datetime.now(), title) | |
) | |
conn.commit() | |
return jsonify({ | |
"success": True, | |
"chat_id": chat_id, | |
"title": title | |
}) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
finally: | |
conn.close() | |
def delete_chat(chat_id): | |
conn = get_db_connection() | |
try: | |
# Удаляем связанные сообщения | |
conn.execute("DELETE FROM messages WHERE chat_id = ?", (chat_id,)) | |
# Удаляем анализы эмоций, связанные с сообщениями этого чата | |
# (если у вас есть связь между analysis_reports и chat_id) | |
conn.execute(""" | |
DELETE FROM analysis_reports | |
WHERE content IN ( | |
SELECT content FROM messages WHERE chat_id = ? | |
) AND user_id = ? | |
""", (chat_id, current_user.id)) | |
# Удаляем сам чат | |
conn.execute("DELETE FROM chats WHERE chat_id = ? AND user_id = ?", | |
(chat_id, current_user.id)) | |
conn.commit() | |
return jsonify({"success": True}) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
finally: | |
conn.close() | |
def get_telegram_analysis(): | |
conn = get_db_connection() | |
try: | |
analyses = conn.execute( | |
"SELECT id, data, created_at FROM telegram_analysis WHERE user_id = ?", | |
(current_user.id,) | |
).fetchall() | |
return jsonify([dict(analysis) for analysis in analyses]) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
finally: | |
conn.close() | |
def load_chat(chat_id): | |
conn = get_db_connection() | |
try: | |
# Получаем информацию о чате | |
chat = conn.execute( | |
"SELECT chat_id, title FROM chats WHERE chat_id = ? AND user_id = ?", | |
(chat_id, current_user.id) | |
).fetchone() | |
if not chat: | |
return jsonify({"error": "Чат не найден"}), 404 | |
# Получаем сообщения чата | |
messages = conn.execute( | |
"SELECT sender, content FROM messages WHERE chat_id = ? ORDER BY timestamp ASC", | |
(chat_id,) | |
).fetchall() | |
return jsonify({ | |
"chat_id": chat["chat_id"], | |
"title": chat["title"], | |
"messages": [dict(msg) for msg in messages] | |
}) | |
finally: | |
conn.close() | |
def save_message(): | |
data = request.get_json() | |
if not data or 'chat_id' not in data or 'content' not in data or 'sender' not in data: | |
return jsonify({"error": "Неверные данные"}), 400 | |
conn = get_db_connection() | |
try: | |
# Проверяем, что чат принадлежит текущему пользователю | |
chat = conn.execute( | |
"SELECT chat_id FROM chats WHERE chat_id = ? AND user_id = ?", | |
(data['chat_id'], current_user.id) | |
).fetchone() | |
if not chat: | |
return jsonify({"error": "Чат не найден"}), 404 | |
# Анализируем эмоцию в тексте | |
emotion = "neutral" | |
confidence = 0.0 | |
if text_classifier and data['content'].strip(): | |
try: | |
predictions = text_classifier(data['content'])[0] | |
top_prediction = max(predictions, key=lambda x: x["score"]) | |
emotion = top_prediction["label"] | |
confidence = top_prediction["score"] | |
# Сохраняем анализ в базу | |
conn.execute( | |
"INSERT INTO analysis_reports (user_id, content, emotion, confidence) VALUES (?, ?, ?, ?)", | |
(current_user.id, data['content'], emotion, confidence) | |
) | |
except Exception as e: | |
print(f"Ошибка анализа эмоции: {e}") | |
# Сохраняем сообщение | |
conn.execute( | |
"INSERT INTO messages (chat_id, sender, content, timestamp) VALUES (?, ?, ?, ?)", | |
(data['chat_id'], data['sender'], data['content'], datetime.now()) | |
) | |
conn.commit() | |
return jsonify({ | |
"status": "success", | |
"emotion": emotion_map.get(emotion, "❓ Неизвестно"), | |
"confidence": round(confidence, 2) | |
}) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
finally: | |
conn.close() | |
if __name__ == "__main__": | |
gr.Interface.load("flask", app=app).launch() | |