import os import numpy as np from flask import Flask, request, jsonify, send_file, send_from_directory import google.generativeai as genai from gtts import gTTS, lang import tempfile import soundfile as sf from kokoro import KPipeline from werkzeug.utils import secure_filename from flask_cors import CORS app = Flask(__name__, static_folder='static') CORS(app) app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB limit # Configure Gemini API GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") if not GEMINI_API_KEY: raise ValueError("GEMINI_API_KEY environment variable not set") genai.configure(api_key=GEMINI_API_KEY) # Language configurations KOKORO_LANGUAGES = { "American English": "a", "British English": "b", "Mandarin Chinese": "z", "Spanish": "e", "French": "f", "Hindi": "h", "Italian": "i", "Brazilian Portuguese": "p" } GTTS_LANGUAGES = lang.tts_langs() GTTS_LANGUAGES['ja'] = 'Japanese' SUPPORTED_LANGUAGES = sorted( list(set(list(KOKORO_LANGUAGES.keys()) + list(GTTS_LANGUAGES.values())) ) MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB Gemini limit CHUNK_SIZE = 20 * 1024 * 1024 # 20MB chunks def process_large_audio(file_path): """Process large audio files in chunks""" try: file_size = os.path.getsize(file_path) if file_size <= MAX_FILE_SIZE: # Process small files normally uploaded_file = genai.upload_file(file_path) return [uploaded_file] # Split large files into chunks chunks = [] with open(file_path, 'rb') as f: chunk_num = 0 while chunk_data := f.read(CHUNK_SIZE): chunk_path = f"{file_path}_chunk_{chunk_num}" with open(chunk_path, 'wb') as chunk_file: chunk_file.write(chunk_data) chunks.append(genai.upload_file(chunk_path)) chunk_num += 1 return chunks except Exception as e: raise RuntimeError(f"File processing failed: {str(e)}") def cleanup_files(file_path, chunks): """Cleanup temporary files and uploaded chunks""" try: if os.path.exists(file_path): os.remove(file_path) for chunk in chunks: if os.path.exists(chunk.name): os.remove(chunk.name) chunk.delete() except Exception as e: app.logger.error(f"Cleanup error: {str(e)}") @app.route('/translate', methods=['POST']) def translate_audio(): temp_path = None uploaded_chunks = [] try: if 'audio' not in request.files: return jsonify({'error': 'No audio file uploaded'}), 400 audio_file = request.files['audio'] target_language = request.form.get('language', 'English') if not audio_file or audio_file.filename == '': return jsonify({'error': 'Invalid audio file'}), 400 # Save to temp file temp_path = os.path.join(tempfile.gettempdir(), secure_filename(audio_file.filename)) audio_file.save(temp_path) # Process file in chunks if needed uploaded_chunks = process_large_audio(temp_path) # Transcribe chunks model = genai.GenerativeModel("gemini-2.0-flash") transcripts = [] for chunk in uploaded_chunks: response = model.generate_content( ["Transcribe this audio chunk verbatim. Respond only with the transcription:", chunk] ) transcripts.append(response.text.strip()) chunk.delete() transcription = " ".join(transcripts) # Translation prompt = f"Translate to {target_language} preserving meaning:\n\n{transcription}" response = model.generate_content(prompt) translated_text = response.text.strip() # TTS Generation if target_language in KOKORO_LANGUAGES: # Kokoro processing lang_code = KOKORO_LANGUAGES[target_language] pipeline = KPipeline(lang_code=lang_code) generator = pipeline(translated_text, voice="af_heart", speed=1) audio_segments = [] for _, _, audio in generator: if audio is not None: audio_segments.append(audio) if not audio_segments: raise ValueError("No audio generated by Kokoro") audio_data = np.concatenate(audio_segments) _, output_path = tempfile.mkstemp(suffix=".wav") sf.write(output_path, audio_data, 24000) else: # gTTS processing lang_code = next((k for k, v in GTTS_LANGUAGES.items() if v == target_language), 'en') tts = gTTS(translated_text, lang=lang_code) _, output_path = tempfile.mkstemp(suffix=".mp3") tts.save(output_path) return jsonify({ 'transcription': transcription, 'translation': translated_text, 'audio_url': f'/download/{os.path.basename(output_path)}' }) except Exception as e: app.logger.error(f"Processing error: {str(e)}") return jsonify({'error': str(e)}), 500 finally: cleanup_files(temp_path, uploaded_chunks) @app.route('/download/') def download_file(filename): try: return send_file( os.path.join(tempfile.gettempdir(), filename), mimetype="audio/mpeg", as_attachment=True, download_name=f"translated_{filename}" ) except Exception as e: return jsonify({'error': str(e)}), 404 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)