Spaces:
Running
Running
import os | |
# import numpy as np # No longer needed for TTS | |
from flask import Flask, request, jsonify, send_file, send_from_directory | |
import google.generativeai as genai | |
from google.generativeai import types as genai_types # For clarity if needed, or use genai.types | |
# from gtts import gTTS, lang # Removed | |
import tempfile | |
# import soundfile as sf # Removed, using wave module instead | |
# from kokoro import KPipeline # Removed | |
from werkzeug.utils import secure_filename | |
from flask_cors import CORS | |
import wave # Added for saving WAV files | |
app = Flask(__name__, static_folder='static') | |
CORS(app) | |
# Configure Gemini API | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
if not GEMINI_API_KEY: | |
raise ValueError("GEMINI_API_KEY environment variable not set") | |
genai.configure(api_key=GEMINI_API_KEY) | |
# Transcription and Translation Model | |
TRANSCRIPTION_TRANSLATION_MODEL_NAME = "gemini-2.0-flash" # Using 1.5 flash as it's common, was "gemini-2.0-flash" | |
# Text-to-Speech Model | |
TTS_MODEL_NAME = "gemini-2.5-flash-preview-tts" # Using a model known to support audio output modality. | |
# The user's example mentioned "gemini-2.5-flash-preview-tts". | |
# If that specific model works with response_mime_type, it can be used. | |
# Gemini TTS Supported Languages (Display Name: BCP-47 Code) | |
# Based on the user-provided list. The TTS API auto-detects language from text. | |
# This list is primarily for the frontend language selector. | |
GEMINI_TTS_LANGUAGES = { | |
"Arabic (Egyptian)": "ar-EG", | |
"German (Germany)": "de-DE", | |
"English (US)": "en-US", | |
"Spanish (US)": "es-US", | |
"French (France)": "fr-FR", | |
"Hindi (India)": "hi-IN", | |
"Indonesian (Indonesia)": "id-ID", | |
"Italian (Italy)": "it-IT", | |
"Japanese (Japan)": "ja-JP", | |
"Korean (Korea)": "ko-KR", | |
"Portuguese (Brazil)": "pt-BR", | |
"Russian (Russia)": "ru-RU", | |
"Dutch (Netherlands)": "nl-NL", | |
"Polish (Poland)": "pl-PL", | |
"Thai (Thailand)": "th-TH", | |
"Turkish (Turkey)": "tr-TR", | |
"Vietnamese (Vietnam)": "vi-VN", | |
"Romanian (Romania)": "ro-RO", | |
"Ukrainian (Ukraine)": "uk-UA", | |
"Bengali (Bangladesh)": "bn-BD", | |
"English (India)": "en-IN", | |
"Marathi (India)": "mr-IN", | |
"Tamil (India)": "ta-IN", | |
"Telugu (India)": "te-IN" | |
} | |
SUPPORTED_LANGUAGES = sorted(list(GEMINI_TTS_LANGUAGES.keys())) | |
# Helper function to save PCM data as a WAV file | |
def save_wave_file(filename, pcm_data, channels=1, sample_width=2, frame_rate=24000): | |
"""Saves PCM audio data to a WAV file.""" | |
with wave.open(filename, "wb") as wf: | |
wf.setnchannels(channels) | |
wf.setsampwidth(sample_width) # Bytes per sample | |
wf.setframerate(frame_rate) | |
wf.writeframes(pcm_data) | |
def serve_index(): | |
return send_from_directory(app.static_folder, 'index.html') | |
def get_languages(): | |
return jsonify(SUPPORTED_LANGUAGES) | |
def translate_audio(): | |
try: | |
if 'audio' not in request.files: | |
return jsonify({'error': 'No audio file uploaded'}), 400 | |
audio_file = request.files['audio'] | |
target_language_display_name = request.form.get('language', 'English (US)') # Default to a common one | |
if not audio_file or audio_file.filename == '': | |
return jsonify({'error': 'Invalid audio file'}), 400 | |
# Validate MIME type for transcription | |
allowed_mime_types = ['audio/wav', 'audio/mpeg', 'audio/mp3', 'audio/ogg', 'audio/flac', 'audio/mp4', 'audio/webm', 'audio/amr'] | |
if audio_file.mimetype not in allowed_mime_types: | |
return jsonify({'error': f'Unsupported file type for transcription: {audio_file.mimetype}'}), 400 | |
# Initialize Gemini model for transcription and translation | |
model = genai.GenerativeModel(TRANSCRIPTION_TRANSLATION_MODEL_NAME) | |
audio_data_bytes = audio_file.read() | |
audio_blob = genai_types.Blob(mime_type=audio_file.mimetype, data=audio_data_bytes) | |
# Get transcription | |
# Forcing transcription to be in original language can be tricky if the model tends to translate. | |
# A more robust prompt might be needed if issues arise. | |
transcription_prompt = "You are a professional transcriber. Transcribe this audio accurately and verbatim in its original spoken language. Respond only with the transcription." | |
# Using genai.upload_file for larger files if needed, but for direct blob: | |
response = model.generate_content([transcription_prompt, audio_blob]) | |
transcription = response.text.strip() | |
# Translate text using Gemini | |
translation_prompt = f"Translate the following text to {target_language_display_name}. Preserve meaning and cultural nuances. Respond only with the translation:\n\n{transcription}" | |
response = model.generate_content(translation_prompt) | |
translated_text = response.text.strip() | |
# Generate TTS using Gemini | |
tts_model = genai.GenerativeModel(TTS_MODEL_NAME) | |
# Gemini TTS detects language from the text. | |
# The voice selection is typically handled by the model or default voice for the detected language. | |
# The user's snippet for `speech_config` and `voice_name='Kore'` is not directly compatible | |
# with the current `google-generativeai` SDK's `GenerativeModel.generate_content` method | |
# in a straightforward way. This method uses `response_mime_type` for audio output. | |
tts_generation_config = genai_types.GenerationConfig( | |
response_mime_type="audio/wav" # Gemini will output WAV audio | |
) | |
# The content for TTS is just the translated text. | |
tts_response = tts_model.generate_content( | |
contents=[translated_text], # Make sure contents is an iterable of Parts or strings | |
generation_config=tts_generation_config | |
) | |
if not (tts_response.candidates and tts_response.candidates[0].content.parts): | |
raise ValueError("Gemini TTS did not return audio data.") | |
audio_pcm_data = tts_response.candidates[0].content.parts[0].inline_data.data | |
_, temp_output_path = tempfile.mkstemp(suffix=".wav") | |
# Default parameters from the user's example: rate=24000, sample_width=2 (16-bit), channels=1 | |
save_wave_file(temp_output_path, audio_pcm_data, channels=1, sample_width=2, frame_rate=24000) | |
return jsonify({ | |
'transcription': transcription, | |
'translation': translated_text, | |
'audio_url': f'/download/{os.path.basename(temp_output_path)}' | |
}) | |
except Exception as e: | |
app.logger.error(f"Error processing request: {str(e)}", exc_info=True) | |
return jsonify({'error': str(e)}), 500 | |
def download_file(filename): | |
try: | |
# tempfile.gettempdir() is the directory where mkstemp creates files | |
file_path = os.path.join(tempfile.gettempdir(), filename) | |
return send_file( | |
file_path, | |
mimetype="audio/wav", # Changed from mpeg to wav | |
as_attachment=True, | |
download_name=f"translated_{filename.replace(tempfile.gettempdir(), '')}" # Cleaner name | |
) | |
except FileNotFoundError: | |
return jsonify({'error': 'File not found'}), 404 | |
except Exception as e: | |
app.logger.error(f"Error downloading file: {str(e)}", exc_info=True) | |
return jsonify({'error': f"Error downloading file: {str(e)}"}), 500 | |
if __name__ == '__main__': | |
# Consider adding an environment variable for debug mode for production | |
app.run(host="0.0.0.0", port=7860) # Added debug=True for development |