Athspi's picture
Update app.py
d060ce1 verified
raw
history blame
6.11 kB
import os
import numpy as np
from flask import Flask, request, jsonify, send_file, send_from_directory
import google.generativeai as genai
from gtts import gTTS, lang
import tempfile
import soundfile as sf
from kokoro import KPipeline
from werkzeug.utils import secure_filename
from flask_cors import CORS
app = Flask(__name__, static_folder='static')
CORS(app)
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB limit
# Configure Gemini API
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY environment variable not set")
genai.configure(api_key=GEMINI_API_KEY)
# Language configurations
KOKORO_LANGUAGES = {
"American English": "a",
"British English": "b",
"Mandarin Chinese": "z",
"Spanish": "e",
"French": "f",
"Hindi": "h",
"Italian": "i",
"Brazilian Portuguese": "p"
}
GTTS_LANGUAGES = lang.tts_langs()
GTTS_LANGUAGES['ja'] = 'Japanese'
SUPPORTED_LANGUAGES = sorted(list(set(list(KOKORO_LANGUAGES.keys()) + list(GTTS_LANGUAGES.values()))))
def upload_large_file(file_path):
"""Handle large file uploads with chunking"""
try:
return genai.upload_file(path=file_path)
except Exception as e:
if "payload size exceeds" in str(e).lower():
# Chunking strategy for large files
chunk_size = 20 * 1024 * 1024 # 20MB chunks
file_parts = []
with open(file_path, 'rb') as f:
i = 0
while chunk := f.read(chunk_size):
part_path = f"{file_path}_part{i}"
with open(part_path, 'wb') as part_file:
part_file.write(chunk)
part = genai.upload_file(path=part_path)
file_parts.append(part)
os.remove(part_path)
i += 1
return file_parts
raise
@app.route('/translate', methods=['POST'])
def translate_audio():
temp_input_path = None
uploaded_file = None
try:
if 'audio' not in request.files:
return jsonify({'error': 'No audio file uploaded'}), 400
audio_file = request.files['audio']
target_language = request.form.get('language', 'English')
if not audio_file or audio_file.filename == '':
return jsonify({'error': 'Invalid audio file'}), 400
# Save to temp file
temp_input_path = os.path.join(tempfile.gettempdir(), secure_filename(audio_file.filename))
audio_file.save(temp_input_path)
# Upload using File API
uploaded_file = upload_large_file(temp_input_path)
# Get transcription
model = genai.GenerativeModel("gemini-2.0-flash-lite")
if isinstance(uploaded_file, list):
# Handle chunked files
transcripts = []
for chunk in uploaded_file:
response = model.generate_content(["Transcribe this audio chunk:", chunk])
transcripts.append(response.text)
chunk.delete() # Clean up each chunk
transcription = " ".join(transcripts)
else:
response = model.generate_content(["Transcribe this audio file:", uploaded_file])
transcription = response.text
# Clean up main file
if uploaded_file and not isinstance(uploaded_file, list):
uploaded_file.delete()
# Translate text using Gemini
prompt = f"Translate the following text to {target_language} preserving meaning and cultural nuances. Respond only with the translation:\n\n{transcription}"
response = model.generate_content(prompt)
translated_text = response.text.strip()
# Generate TTS
if target_language in KOKORO_LANGUAGES:
lang_code = KOKORO_LANGUAGES[target_language]
pipeline = KPipeline(lang_code=lang_code)
generator = pipeline(translated_text, voice="af_heart", speed=1)
# Collect all audio segments
audio_segments = []
for _, _, audio in generator:
if audio is not None:
audio_segments.append(audio)
if audio_segments:
audio_data = np.concatenate(audio_segments)
_, temp_output_path = tempfile.mkstemp(suffix=".wav")
sf.write(temp_output_path, audio_data, 24000)
else:
raise ValueError("No audio generated by Kokoro")
else:
# Standard gTTS handling
lang_code = next((k for k, v in GTTS_LANGUAGES.items() if v == target_language), 'en')
tts = gTTS(translated_text, lang=lang_code)
_, temp_output_path = tempfile.mkstemp(suffix=".mp3")
tts.save(temp_output_path)
return jsonify({
'transcription': transcription,
'translation': translated_text,
'audio_url': f'/download/{os.path.basename(temp_output_path)}'
})
except Exception as e:
# Cleanup resources on error
if uploaded_file:
if isinstance(uploaded_file, list):
for f in uploaded_file:
f.delete()
else:
uploaded_file.delete()
if temp_input_path and os.path.exists(temp_input_path):
os.remove(temp_input_path)
app.logger.error(f"Error processing request: {str(e)}")
return jsonify({'error': str(e)}), 500
finally:
if temp_input_path and os.path.exists(temp_input_path):
os.remove(temp_input_path)
@app.route('/download/<filename>')
def download_file(filename):
try:
return send_file(
os.path.join(tempfile.gettempdir(), filename),
mimetype="audio/mpeg",
as_attachment=True,
download_name=f"translated_{filename}"
)
except FileNotFoundError:
return jsonify({'error': 'File not found'}), 404
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7820)