athspi / app.py
Athspi's picture
Update app.py
465bca7 verified
raw
history blame
5.65 kB
# app.py - Complete Flask Backend
from flask import Flask, request, jsonify, send_from_directory
import google.generativeai as genai
from dotenv import load_dotenv
import os
from flask_cors import CORS
import markdown2
import re
from gtts import gTTS
import uuid
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Configuration
AUDIO_FOLDER = os.path.join('static', 'audio')
os.makedirs(AUDIO_FOLDER, exist_ok=True)
MAX_AUDIO_LENGTH = 5000 # characters
# Initialize Flask app
app = Flask(__name__, static_folder='static')
CORS(app)
# Enhanced Gemini System Instruction
SYSTEM_INSTRUCTION = """
You are AstroChat, an advanced AI assistant with voice capabilities. Follow these guidelines:
1. Voice Responses:
- When users request audio (e.g., "read this", "speak aloud", "audio version"), include [AUDIO] in response
- Structure responses for optimal TTS:
* Short sentences (12-15 words)
* Pause between paragraphs
* Spell out complex terms
2. Content Formatting:
- Code: Explain → Format in markdown
- Lists: Use bullet points
- Quotes: Provide attribution
- Math/Science: Explain symbols verbally
3. Interaction Style:
- Friendly but professional
- Ask clarifying questions
- Admit knowledge limits
- Offer follow-up suggestions
4. Special Cases:
- Acronyms: Spell out first use
- Names: Provide pronunciation hints
- Technical terms: Give simple definitions
"""
# Initialize Gemini
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
model = genai.GenerativeModel(
'gemini-1.5-flash',
system_instruction=SYSTEM_INSTRUCTION
)
def process_response(text):
"""Process AI response for audio triggers and markdown conversion"""
audio_requested = '[AUDIO]' in text
clean_text = text.replace('[AUDIO]', '').strip()
# Convert markdown to HTML with enhanced processing
extras = [
"fenced-code-blocks",
"tables",
"code-friendly",
"cuddled-lists"
]
html = markdown2.markdown(clean_text, extras=extras)
# Enhanced code block styling
html = re.sub(
r'<pre><code(.*?)>',
r'<pre class="code-block"><code\1>',
html
)
# Improve link handling
html = re.sub(
r'<a href="(.*?)">(.*?)</a>',
r'<a href="\1" target="_blank" rel="noopener">\2</a>',
html
)
return {
"response_html": html,
"response_text": clean_text,
"audio_requested": audio_requested
}
@app.route('/chat', methods=['POST'])
def handle_chat():
try:
data = request.json
user_message = data.get('message', '').strip()
if not user_message:
return jsonify({"error": "Empty message"}), 400
# Detect audio requests
audio_triggers = [
"read aloud", "speak this", "audio please",
"say it", "voice response", "read this",
"can you speak", "tell me aloud"
]
explicit_audio = any(
trigger in user_message.lower()
for trigger in audio_triggers
)
# Generate response
response = model.generate_content(user_message)
processed = process_response(response.text)
# Force audio if explicitly requested
if explicit_audio:
processed["audio_requested"] = True
return jsonify(processed)
except Exception as e:
logger.error(f"Chat error: {str(e)}")
return jsonify({
"error": "I encountered an error",
"details": str(e)
}), 500
@app.route('/generate-audio', methods=['POST'])
def handle_audio():
try:
data = request.json
text = data.get('text', '').strip()
if not text:
return jsonify({"error": "No text provided"}), 400
# Enhanced text cleaning
clean_text = re.sub(r'[\*_`#\[\]]', '', text) # Remove markdown
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
# Safe truncation
if len(clean_text) > MAX_AUDIO_LENGTH:
clean_text = clean_text[:MAX_AUDIO_LENGTH]
clean_text += "... [content truncated]"
# Generate unique filename
filename = f"audio_{uuid.uuid4()}.mp3"
filepath = os.path.join(AUDIO_FOLDER, filename)
# Generate speech with enhanced parameters
tts = gTTS(
text=clean_text,
lang='en',
slow=False,
lang_check=False,
pre_processor_funcs=[
lambda x: re.sub(r'([a-z])([A-Z])', r'\1 \2', x) # Handle camelCase
]
)
tts.save(filepath)
return jsonify({
"audio_url": f"/audio/{filename}",
"text_length": len(clean_text)
})
except Exception as e:
logger.error(f"Audio error: {str(e)}")
return jsonify({
"error": "Audio generation failed",
"details": str(e)
}), 500
@app.route('/audio/<filename>')
def serve_audio(filename):
try:
return send_from_directory(AUDIO_FOLDER, filename)
except FileNotFoundError:
return jsonify({"error": "Audio file not found"}), 404
@app.route('/')
def serve_index():
return send_from_directory('static', 'index.html')
@app.route('/<path:path>')
def serve_static(path):
return send_from_directory('static', path)
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
app.run(host="0.0.0.0", port=port)