|
|
|
from flask import Flask, request, jsonify, send_from_directory |
|
import google.generativeai as genai |
|
from dotenv import load_dotenv |
|
import os |
|
from flask_cors import CORS |
|
import markdown2 |
|
import re |
|
from gtts import gTTS |
|
import uuid |
|
from collections import deque |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
AUDIO_FOLDER = os.path.join('static', 'audio') |
|
os.makedirs(AUDIO_FOLDER, exist_ok=True) |
|
|
|
app = Flask(__name__, static_folder='static') |
|
CORS(app) |
|
|
|
|
|
conversation_histories = {} |
|
MAX_HISTORY_LENGTH = 10 |
|
|
|
|
|
system_instruction = """ |
|
You are a helpful AI assistant named Athspi. When responding: |
|
1. Maintain conversation context naturally |
|
2. For responses that would benefit from audio (like stories), include between: |
|
[AUDIO]content here[/AUDIO] |
|
3. Keep responses conversational and friendly |
|
4. Remember previous interactions in this conversation |
|
""" |
|
|
|
genai.configure(api_key=os.getenv("GEMINI_API_KEY")) |
|
model = genai.GenerativeModel('gemini-2.5-flash', system_instruction=system_instruction) |
|
|
|
def convert_markdown_to_html(text): |
|
html = markdown2.markdown(text, extras=["fenced-code-blocks", "tables"]) |
|
html = re.sub(r'<pre><code(.*?)>', r'<pre class="code-block"><code\1>', html) |
|
return html |
|
|
|
def process_response(full_response): |
|
audio_match = re.search(r'\[AUDIO\](.*?)\[/AUDIO\]', full_response, re.DOTALL) |
|
audio_content = audio_match.group(1).strip() if audio_match else None |
|
visible_text = re.sub(r'\[/?AUDIO\]', '', full_response).strip() |
|
return visible_text, audio_content |
|
|
|
def generate_audio(text): |
|
text = re.sub(r'[^\w\s.,!?\-]', '', text) |
|
filename = f"audio_{uuid.uuid4()}.mp3" |
|
filepath = os.path.join(AUDIO_FOLDER, filename) |
|
tts = gTTS(text=text, lang='en', slow=False) |
|
tts.save(filepath) |
|
return filename |
|
|
|
def get_conversation_history(session_id): |
|
if session_id not in conversation_histories: |
|
conversation_histories[session_id] = deque(maxlen=MAX_HISTORY_LENGTH) |
|
return conversation_histories[session_id] |
|
|
|
@app.route('/start_session', methods=['POST']) |
|
def start_session(): |
|
session_id = str(uuid.uuid4()) |
|
conversation_histories[session_id] = deque(maxlen=MAX_HISTORY_LENGTH) |
|
return jsonify({"session_id": session_id}) |
|
|
|
@app.route('/chat', methods=['POST']) |
|
def chat(): |
|
try: |
|
data = request.json |
|
user_message = data.get('message', '').strip() |
|
session_id = data.get('session_id') |
|
|
|
if not user_message: |
|
return jsonify({"error": "Message required"}), 400 |
|
if not session_id: |
|
return jsonify({"error": "Session ID required"}), 400 |
|
|
|
history = get_conversation_history(session_id) |
|
|
|
|
|
chat_session = model.start_chat(history=list(history)) |
|
|
|
|
|
response = chat_session.send_message(user_message) |
|
|
|
|
|
history.extend([ |
|
{"role": "user", "parts": [user_message]}, |
|
{"role": "model", "parts": [response.text]} |
|
]) |
|
|
|
visible_text, audio_content = process_response(response.text) |
|
|
|
result = { |
|
"response_text": visible_text, |
|
"response_html": convert_markdown_to_html(visible_text), |
|
"has_audio": False |
|
} |
|
|
|
if audio_content: |
|
audio_filename = generate_audio(audio_content) |
|
result["audio_filename"] = audio_filename |
|
result["has_audio"] = True |
|
|
|
return jsonify(result) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route('/download/<filename>') |
|
def download_audio(filename): |
|
try: |
|
return send_from_directory(AUDIO_FOLDER, filename, as_attachment=True) |
|
except FileNotFoundError: |
|
return jsonify({"error": "Audio file not found"}), 404 |
|
|
|
@app.route('/') |
|
def serve_index(): |
|
return send_from_directory('static', 'index.html') |
|
|
|
@app.route('/<path:path>') |
|
def serve_static(path): |
|
return send_from_directory('static', path) |
|
|
|
if __name__ == '__main__': |
|
app.run(host="0.0.0.0", port=7860) |