|
|
|
from flask import Flask, request, jsonify, send_from_directory |
|
import google.generativeai as genai |
|
from dotenv import load_dotenv |
|
import os |
|
from flask_cors import CORS |
|
import markdown2 |
|
import re |
|
from gtts import gTTS |
|
import uuid |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
AUDIO_FOLDER = os.path.join('static', 'audio') |
|
if not os.path.exists(AUDIO_FOLDER): |
|
os.makedirs(AUDIO_FOLDER) |
|
|
|
|
|
app = Flask(__name__, static_folder='static') |
|
CORS(app) |
|
|
|
|
|
system_instruction_text = """ |
|
You are a friendly, natural-sounding AI assistant named Athspi. |
|
When responding: |
|
- Use a warm, conversational tone |
|
- Never mention technical terms like "audio", "text", or "response" |
|
- For stories, begin with "Here's your story π" followed by a friendly intro |
|
- For explanations, use simple, clear language |
|
- Format responses for pleasant reading and listening |
|
- When audio is requested, include story content between special markers as shown: |
|
[AUDIO_START] |
|
[story content here] |
|
[AUDIO_END] |
|
But DO NOT include these markers in the visible response |
|
""" |
|
|
|
genai.configure(api_key=os.getenv("GEMINI_API_KEY")) |
|
model = genai.GenerativeModel( |
|
'gemini-2.5-flash', |
|
system_instruction=system_instruction_text |
|
) |
|
|
|
def convert_markdown_to_html(text): |
|
html = markdown2.markdown(text, extras=["fenced-code-blocks", "tables"]) |
|
html = re.sub(r'<pre><code(.*?)>', r'<pre class="code-block"><code\1>', html) |
|
html = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', html) |
|
html = re.sub(r'\*(.*?)\*', r'<em>\1</em>', html) |
|
return html |
|
|
|
def detect_audio_request(text): |
|
"""Detect if user is requesting audio""" |
|
audio_keywords = [ |
|
'audio', 'speak', 'say it', 'read aloud', |
|
'hear', 'listen', 'tell me out loud' |
|
] |
|
return any(keyword in text.lower() for keyword in audio_keywords) |
|
|
|
def extract_audio_content(full_text): |
|
"""Extract audio-specific content between markers""" |
|
pattern = r'\[AUDIO_START\](.*?)\[AUDIO_END\]' |
|
match = re.search(pattern, full_text, re.DOTALL) |
|
if match: |
|
return match.group(1).strip() |
|
return full_text |
|
|
|
def clean_visible_response(full_text): |
|
"""Remove audio markers from visible response""" |
|
return re.sub(r'\[AUDIO_(START|END)\]', '', full_text).strip() |
|
|
|
def generate_audio_file(text): |
|
"""Generate audio file from text and return filename""" |
|
cleaned_text = re.sub(r'[\*_`#]', '', text) |
|
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() |
|
|
|
if not cleaned_text: |
|
return None |
|
|
|
filename = f"{uuid.uuid4()}.mp3" |
|
filepath = os.path.join(AUDIO_FOLDER, filename) |
|
|
|
tts = gTTS(text=cleaned_text, lang='en', slow=False) |
|
tts.save(filepath) |
|
|
|
return filename |
|
|
|
@app.route('/chat', methods=['POST']) |
|
def chat(): |
|
try: |
|
data = request.json |
|
user_message = data.get('message') |
|
|
|
if not user_message: |
|
return jsonify({"error": "No message provided"}), 400 |
|
|
|
|
|
audio_requested = detect_audio_request(user_message) |
|
|
|
|
|
if audio_requested: |
|
user_message += "\n\nPlease include [AUDIO_START] and [AUDIO_END] markers around the story content." |
|
|
|
response = model.generate_content(user_message) |
|
full_response = response.text |
|
|
|
|
|
visible_response = clean_visible_response(full_response) |
|
|
|
|
|
audio_url = None |
|
if audio_requested: |
|
|
|
audio_content = extract_audio_content(full_response) |
|
if not audio_content: |
|
audio_content = visible_response |
|
|
|
|
|
audio_filename = generate_audio_file(audio_content) |
|
if audio_filename: |
|
audio_url = f"/static/audio/{audio_filename}" |
|
|
|
html_response = convert_markdown_to_html(visible_response) |
|
|
|
return jsonify({ |
|
"response_html": html_response, |
|
"response_text": visible_response, |
|
"audio_url": audio_url |
|
}) |
|
|
|
except Exception as e: |
|
app.logger.error(f"Chat Error: {e}") |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route('/generate-audio', methods=['POST']) |
|
def generate_audio(): |
|
try: |
|
data = request.json |
|
text_to_speak = data.get('text') |
|
|
|
if not text_to_speak: |
|
return jsonify({"error": "No text provided"}), 400 |
|
|
|
cleaned_text = re.sub(r'[\*_`#]', '', text_to_speak) |
|
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() |
|
|
|
if not cleaned_text: |
|
return jsonify({"error": "Text became empty after cleaning"}), 400 |
|
|
|
filename = f"{uuid.uuid4()}.mp3" |
|
filepath = os.path.join(AUDIO_FOLDER, filename) |
|
|
|
tts = gTTS(text=cleaned_text, lang='en', slow=False) |
|
tts.save(filepath) |
|
|
|
audio_url = f"/static/audio/{filename}" |
|
return jsonify({"audio_url": audio_url}) |
|
|
|
except Exception as e: |
|
app.logger.error(f"Audio Generation Error: {e}") |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route('/static/audio/<filename>') |
|
def serve_audio(filename): |
|
return send_from_directory(AUDIO_FOLDER, filename) |
|
|
|
@app.route('/') |
|
def serve_index(): |
|
return send_from_directory('static', 'index.html') |
|
|
|
@app.route('/<path:path>') |
|
def serve_static(path): |
|
return send_from_directory('static', path) |
|
|
|
if __name__ == '__main__': |
|
app.run(host="0.0.0.0", port=7860) |