athspi / app.py
Athspi's picture
Update app.py
26c1fb4 verified
raw
history blame
4.24 kB
# app.py - Flask Backend
from flask import Flask, request, jsonify, send_from_directory
import google.generativeai as genai
from dotenv import load_dotenv
import os
from flask_cors import CORS
import markdown2
import re
from gtts import gTTS
import uuid
from collections import deque
# Load environment variables
load_dotenv()
# Configure paths
AUDIO_FOLDER = os.path.join('static', 'audio')
os.makedirs(AUDIO_FOLDER, exist_ok=True)
app = Flask(__name__, static_folder='static')
CORS(app)
# Conversation history storage
conversation_histories = {}
MAX_HISTORY_LENGTH = 10 # Number of messages to remember
# AI Configuration
system_instruction = """
You are a helpful AI assistant named Athspi. When responding:
1. Maintain conversation context naturally
2. For responses that would benefit from audio (like stories), include between:
[AUDIO]content here[/AUDIO]
3. Keep responses conversational and friendly
4. Remember previous interactions in this conversation
"""
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
model = genai.GenerativeModel('gemini-2.5-flash', system_instruction=system_instruction)
def convert_markdown_to_html(text):
html = markdown2.markdown(text, extras=["fenced-code-blocks", "tables"])
html = re.sub(r'<pre><code(.*?)>', r'<pre class="code-block"><code\1>', html)
return html
def process_response(full_response):
audio_match = re.search(r'\[AUDIO\](.*?)\[/AUDIO\]', full_response, re.DOTALL)
audio_content = audio_match.group(1).strip() if audio_match else None
visible_text = re.sub(r'\[/?AUDIO\]', '', full_response).strip()
return visible_text, audio_content
def generate_audio(text):
text = re.sub(r'[^\w\s.,!?\-]', '', text)
filename = f"audio_{uuid.uuid4()}.mp3"
filepath = os.path.join(AUDIO_FOLDER, filename)
tts = gTTS(text=text, lang='en', slow=False)
tts.save(filepath)
return filename
def get_conversation_history(session_id):
if session_id not in conversation_histories:
conversation_histories[session_id] = deque(maxlen=MAX_HISTORY_LENGTH)
return conversation_histories[session_id]
@app.route('/start_session', methods=['POST'])
def start_session():
session_id = str(uuid.uuid4())
conversation_histories[session_id] = deque(maxlen=MAX_HISTORY_LENGTH)
return jsonify({"session_id": session_id})
@app.route('/chat', methods=['POST'])
def chat():
try:
data = request.json
user_message = data.get('message', '').strip()
session_id = data.get('session_id')
if not user_message:
return jsonify({"error": "Message required"}), 400
if not session_id:
return jsonify({"error": "Session ID required"}), 400
history = get_conversation_history(session_id)
# Build conversation context
chat_session = model.start_chat(history=list(history))
# Get AI response with context
response = chat_session.send_message(user_message)
# Update history
history.extend([
{"role": "user", "parts": [user_message]},
{"role": "model", "parts": [response.text]}
])
visible_text, audio_content = process_response(response.text)
result = {
"response_text": visible_text,
"response_html": convert_markdown_to_html(visible_text),
"has_audio": False
}
if audio_content:
audio_filename = generate_audio(audio_content)
result["audio_filename"] = audio_filename
result["has_audio"] = True
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/download/<filename>')
def download_audio(filename):
try:
return send_from_directory(AUDIO_FOLDER, filename, as_attachment=True)
except FileNotFoundError:
return jsonify({"error": "Audio file not found"}), 404
@app.route('/')
def serve_index():
return send_from_directory('static', 'index.html')
@app.route('/<path:path>')
def serve_static(path):
return send_from_directory('static', path)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7860)