Spaces:
Running
Running
import os | |
import tempfile | |
import base64 | |
from flask import Flask, request, jsonify, send_file, send_from_directory | |
from google import genai | |
from google.genai import types | |
from gtts import gTTS, lang | |
from kokoro import KPipeline | |
from werkzeug.utils import secure_filename | |
from flask_cors import CORS | |
app = Flask(__name__, static_folder='static') | |
CORS(app) | |
# Configure Gemini API | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
if not GEMINI_API_KEY: | |
raise ValueError("GEMINI_API_KEY environment variable not set") | |
# Initialize Gemini client | |
client = genai.Client(api_key=GEMINI_API_KEY) | |
# Language configurations | |
KOKORO_LANGUAGES = { | |
"American English": "a", | |
"British English": "b", | |
"Japanese": "j", | |
"Mandarin Chinese": "z", | |
"Spanish": "e", | |
"French": "f", | |
"Hindi": "h", | |
"Italian": "i", | |
"Brazilian Portuguese": "p" | |
} | |
GTTS_LANGUAGES = lang.tts_langs() | |
SUPPORTED_LANGUAGES = sorted(list(KOKORO_LANGUAGES.keys()) + list(GTTS_LANGUAGES.values())) | |
def serve_index(): | |
return send_from_directory(app.static_folder, 'index.html') | |
def get_languages(): | |
return jsonify(SUPPORTED_LANGUAGES) | |
def translate_audio(): | |
try: | |
if 'audio' not in request.files: | |
return jsonify({'error': 'No audio file uploaded'}), 400 | |
audio_file = request.files['audio'] | |
target_language = request.form.get('language', 'English') | |
if not audio_file or audio_file.filename == '': | |
return jsonify({'error': 'Invalid audio file'}), 400 | |
# Save temporary audio file | |
filename = secure_filename(audio_file.filename) | |
temp_input_path = os.path.join(tempfile.gettempdir(), filename) | |
audio_file.save(temp_input_path) | |
# Transcribe audio using Gemini | |
with open(temp_input_path, "rb") as audio_file: | |
audio_data = base64.b64encode(audio_file.read()).decode("utf-8") | |
files = [client.files.upload(file=temp_input_path)] | |
contents = [ | |
types.Content( | |
role="user", | |
parts=[ | |
types.Part.from_uri( | |
file_uri=files[0].uri, | |
mime_type=files[0].mime_type, | |
), | |
types.Part.from_text(text="Transcript the audio and provide only the text. Do not include any explanations or additional information."), | |
], | |
), | |
] | |
generate_content_config = types.GenerateContentConfig( | |
temperature=1, | |
top_p=0.95, | |
top_k=40, | |
max_output_tokens=8192, | |
response_mime_type="text/plain", | |
) | |
transcription = "" | |
for chunk in client.models.generate_content_stream( | |
model="gemini-2.0-flash-lite", | |
contents=contents, | |
config=generate_content_config, | |
): | |
transcription += chunk.text | |
# Translate text using Gemini | |
translate_prompt = f"Translate the following text to {target_language} and return only the translated text with no additional explanation or commentary:\n\n{transcription}" | |
translate_contents = [ | |
types.Content( | |
role="user", | |
parts=[ | |
types.Part.from_text(text=translate_prompt), | |
], | |
), | |
] | |
translated_text = "" | |
for chunk in client.models.generate_content_stream( | |
model="gemini-2.0-flash-lite", | |
contents=translate_contents, | |
config=generate_content_config, | |
): | |
translated_text += chunk.text | |
# Generate TTS | |
if target_language in KOKORO_LANGUAGES: | |
lang_code = KOKORO_LANGUAGES[target_language] | |
pipeline = KPipeline(lang_code=lang_code) | |
generator = pipeline(translated_text, voice="af_heart", speed=1) | |
audio_data = next((audio for _, _, audio in generator), None) | |
if audio_data: | |
_, temp_output_path = tempfile.mkstemp(suffix=".wav") | |
sf.write(temp_output_path, audio_data, 24000) | |
else: | |
lang_code = next((k for k, v in GTTS_LANGUAGES.items() if v == target_language), 'en') | |
tts = gTTS(translated_text, lang=lang_code) | |
_, temp_output_path = tempfile.mkstemp(suffix=".mp3") | |
tts.save(temp_output_path) | |
return jsonify({ | |
'transcription': transcription, | |
'translation': translated_text, | |
'audio_url': f'/download/{os.path.basename(temp_output_path)}' | |
}) | |
except Exception as e: | |
app.logger.error(f"Error processing request: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def download_file(filename): | |
try: | |
return send_file( | |
os.path.join(tempfile.gettempdir(), filename), | |
mimetype="audio/mpeg", | |
as_attachment=True, | |
download_name=f"translated_{filename}" | |
) | |
except FileNotFoundError: | |
return jsonify({'error': 'File not found'}), 404 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=5000, debug=True) |