import os import hashlib import numpy as np from flask import Flask, request, jsonify, send_file, send_from_directory import google.generativeai as genai from gtts import gTTS, lang import tempfile import soundfile as sf from kokoro import KPipeline from werkzeug.utils import secure_filename from flask_cors import CORS from werkzeug.middleware.proxy_fix import ProxyFix app = Flask(__name__, static_folder='static') CORS(app, supports_credentials=True) app.config.update( MAX_CONTENT_LENGTH=100 * 1024 * 1024, # 100MB SECRET_KEY=os.urandom(24), SESSION_COOKIE_SAMESITE='Lax' ) app.wsgi_app = ProxyFix(app.wsgi_app) # Configure Gemini API genai.configure(api_key=os.getenv("GEMINI_API_KEY")) # Language configurations KOKORO_LANGUAGES = { "American English": "a", "British English": "b", "Mandarin Chinese": "z", "Spanish": "e", "French": "f", "Hindi": "h", "Italian": "i", "Brazilian Portuguese": "p" } GTTS_LANGUAGES = lang.tts_langs() GTTS_LANGUAGES['ja'] = 'Japanese' SUPPORTED_LANGUAGES = sorted( list(set(list(KOKORO_LANGUAGES.keys()) + list(GTTS_LANGUAGES.values())) ) @app.route('/') def serve_index(): return send_from_directory(app.static_folder, 'index.html') @app.route('/languages') def get_languages(): return jsonify(SUPPORTED_LANGUAGES) @app.route('/upload-chunk', methods=['POST']) def upload_chunk(): try: file = request.files['file'] chunk_index = int(request.form['chunkIndex']) total_chunks = int(request.form['totalChunks']) file_hash = request.form['fileHash'] # Save chunk to temp directory chunk_dir = os.path.join(tempfile.gettempdir(), file_hash) os.makedirs(chunk_dir, exist_ok=True) chunk_path = os.path.join(chunk_dir, f"{chunk_index:04d}") file.save(chunk_path) return jsonify({'status': 'success', 'received': chunk_index}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/process-file', methods=['POST']) def process_file(): try: file_hash = request.json['fileHash'] target_language = request.json['language'] chunk_dir = os.path.join(tempfile.gettempdir(), file_hash) # Reassemble file final_path = os.path.join(tempfile.gettempdir(), file_hash + ".wav") with open(final_path, 'wb') as output_file: for chunk_name in sorted(os.listdir(chunk_dir)): with open(os.path.join(chunk_dir, chunk_name), 'rb') as chunk_file: output_file.write(chunk_file.read()) # Process file result = process_audio(final_path, target_language) # Cleanup os.remove(final_path) for f in os.listdir(chunk_dir): os.remove(os.path.join(chunk_dir, f)) os.rmdir(chunk_dir) return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 def process_audio(file_path, target_language): # Transcribe using Gemini model = genai.GenerativeModel("gemini-2.0-flash") uploaded_file = genai.upload_file(path=file_path) try: response = model.generate_content(["Transcribe this audio file:", uploaded_file]) transcription = response.text.strip() # Translate prompt = f"Translate to {target_language} preserving meaning and cultural nuances. Respond only with the translation:\n\n{transcription}" response = model.generate_content(prompt) translated_text = response.text.strip() # Generate TTS if target_language in KOKORO_LANGUAGES: lang_code = KOKORO_LANGUAGES[target_language] pipeline = KPipeline(lang_code=lang_code) generator = pipeline(translated_text, voice="af_heart", speed=1) # Collect all audio segments audio_segments = [] for _, _, audio in generator: if audio is not None: audio_segments.append(audio) if audio_segments: audio_data = np.concatenate(audio_segments) _, temp_output_path = tempfile.mkstemp(suffix=".wav") sf.write(temp_output_path, audio_data, 24000) else: raise ValueError("No audio generated by Kokoro") else: # Fallback to gTTS lang_code = next((k for k, v in GTTS_LANGUAGES.items() if v == target_language), 'en') tts = gTTS(translated_text, lang=lang_code) _, temp_output_path = tempfile.mkstemp(suffix=".mp3") tts.save(temp_output_path) return { 'transcription': transcription, 'translation': translated_text, 'audio_url': f'/download/{os.path.basename(temp_output_path)}' } finally: uploaded_file.delete() @app.route('/download/') def download_file(filename): try: return send_file( os.path.join(tempfile.gettempdir(), filename), mimetype="audio/mpeg", as_attachment=True, download_name=f"translated_{filename}" ) except FileNotFoundError: return jsonify({'error': 'File not found'}), 404 if __name__ == '__main__': app.run(host="0.0.0.0", port=7860)