Spaces:
Running
Running
File size: 6,109 Bytes
5f33e0e 7582b7f d0dd39c ab0df5d dbe8a71 c07d698 6ebed08 7cc4829 dbe8a71 6ebed08 5ddb059 7cc4829 5ddb059 dbe8a71 7cc4829 6ebed08 7cc4829 6c131f6 317b2f2 fca0d6a 6ebed08 5ddb059 6c131f6 5ddb059 6c131f6 5ddb059 965bd2d 5ddb059 6c131f6 5ddb059 5f33e0e 6c131f6 5ddb059 6ebed08 7cc4829 70e979d 6ebed08 7582b7f 70e979d 7582b7f 6ebed08 7582b7f 6ebed08 5ddb059 6ebed08 7cc4829 5ddb059 7cc4829 5ddb059 6c131f6 5ddb059 dbe8a71 7cc4829 ef2c8e0 7cc4829 dbe8a71 7cc4829 d060ce1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
import os
import numpy as np
from flask import Flask, request, jsonify, send_file, send_from_directory
import google.generativeai as genai
from gtts import gTTS, lang
import tempfile
import soundfile as sf
from kokoro import KPipeline
from werkzeug.utils import secure_filename
from flask_cors import CORS
app = Flask(__name__, static_folder='static')
CORS(app)
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB limit
# Configure Gemini API
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY environment variable not set")
genai.configure(api_key=GEMINI_API_KEY)
# Language configurations
KOKORO_LANGUAGES = {
"American English": "a",
"British English": "b",
"Mandarin Chinese": "z",
"Spanish": "e",
"French": "f",
"Hindi": "h",
"Italian": "i",
"Brazilian Portuguese": "p"
}
GTTS_LANGUAGES = lang.tts_langs()
GTTS_LANGUAGES['ja'] = 'Japanese'
SUPPORTED_LANGUAGES = sorted(list(set(list(KOKORO_LANGUAGES.keys()) + list(GTTS_LANGUAGES.values()))))
def upload_large_file(file_path):
"""Handle large file uploads with chunking"""
try:
return genai.upload_file(path=file_path)
except Exception as e:
if "payload size exceeds" in str(e).lower():
# Chunking strategy for large files
chunk_size = 20 * 1024 * 1024 # 20MB chunks
file_parts = []
with open(file_path, 'rb') as f:
i = 0
while chunk := f.read(chunk_size):
part_path = f"{file_path}_part{i}"
with open(part_path, 'wb') as part_file:
part_file.write(chunk)
part = genai.upload_file(path=part_path)
file_parts.append(part)
os.remove(part_path)
i += 1
return file_parts
raise
@app.route('/translate', methods=['POST'])
def translate_audio():
temp_input_path = None
uploaded_file = None
try:
if 'audio' not in request.files:
return jsonify({'error': 'No audio file uploaded'}), 400
audio_file = request.files['audio']
target_language = request.form.get('language', 'English')
if not audio_file or audio_file.filename == '':
return jsonify({'error': 'Invalid audio file'}), 400
# Save to temp file
temp_input_path = os.path.join(tempfile.gettempdir(), secure_filename(audio_file.filename))
audio_file.save(temp_input_path)
# Upload using File API
uploaded_file = upload_large_file(temp_input_path)
# Get transcription
model = genai.GenerativeModel("gemini-2.0-flash-lite")
if isinstance(uploaded_file, list):
# Handle chunked files
transcripts = []
for chunk in uploaded_file:
response = model.generate_content(["Transcribe this audio chunk:", chunk])
transcripts.append(response.text)
chunk.delete() # Clean up each chunk
transcription = " ".join(transcripts)
else:
response = model.generate_content(["Transcribe this audio file:", uploaded_file])
transcription = response.text
# Clean up main file
if uploaded_file and not isinstance(uploaded_file, list):
uploaded_file.delete()
# Translate text using Gemini
prompt = f"Translate the following text to {target_language} preserving meaning and cultural nuances. Respond only with the translation:\n\n{transcription}"
response = model.generate_content(prompt)
translated_text = response.text.strip()
# Generate TTS
if target_language in KOKORO_LANGUAGES:
lang_code = KOKORO_LANGUAGES[target_language]
pipeline = KPipeline(lang_code=lang_code)
generator = pipeline(translated_text, voice="af_heart", speed=1)
# Collect all audio segments
audio_segments = []
for _, _, audio in generator:
if audio is not None:
audio_segments.append(audio)
if audio_segments:
audio_data = np.concatenate(audio_segments)
_, temp_output_path = tempfile.mkstemp(suffix=".wav")
sf.write(temp_output_path, audio_data, 24000)
else:
raise ValueError("No audio generated by Kokoro")
else:
# Standard gTTS handling
lang_code = next((k for k, v in GTTS_LANGUAGES.items() if v == target_language), 'en')
tts = gTTS(translated_text, lang=lang_code)
_, temp_output_path = tempfile.mkstemp(suffix=".mp3")
tts.save(temp_output_path)
return jsonify({
'transcription': transcription,
'translation': translated_text,
'audio_url': f'/download/{os.path.basename(temp_output_path)}'
})
except Exception as e:
# Cleanup resources on error
if uploaded_file:
if isinstance(uploaded_file, list):
for f in uploaded_file:
f.delete()
else:
uploaded_file.delete()
if temp_input_path and os.path.exists(temp_input_path):
os.remove(temp_input_path)
app.logger.error(f"Error processing request: {str(e)}")
return jsonify({'error': str(e)}), 500
finally:
if temp_input_path and os.path.exists(temp_input_path):
os.remove(temp_input_path)
@app.route('/download/<filename>')
def download_file(filename):
try:
return send_file(
os.path.join(tempfile.gettempdir(), filename),
mimetype="audio/mpeg",
as_attachment=True,
download_name=f"translated_{filename}"
)
except FileNotFoundError:
return jsonify({'error': 'File not found'}), 404
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7820) |