File size: 6,109 Bytes
5f33e0e
7582b7f
d0dd39c
ab0df5d
dbe8a71
c07d698
6ebed08
 
7cc4829
 
dbe8a71
6ebed08
5ddb059
 
7cc4829
 
5ddb059
 
 
 
dbe8a71
7cc4829
6ebed08
 
 
 
 
 
 
 
 
 
 
7cc4829
6c131f6
317b2f2
fca0d6a
6ebed08
5ddb059
 
6c131f6
5ddb059
6c131f6
5ddb059
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
965bd2d
5ddb059
 
 
 
6c131f6
 
5ddb059
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f33e0e
6c131f6
5ddb059
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6ebed08
 
7cc4829
70e979d
6ebed08
 
 
 
7582b7f
 
 
 
70e979d
7582b7f
 
 
 
6ebed08
 
7582b7f
 
6ebed08
5ddb059
6ebed08
 
 
 
7cc4829
5ddb059
7cc4829
 
 
5ddb059
 
 
 
 
 
 
 
 
 
 
 
 
 
6c131f6
5ddb059
 
dbe8a71
7cc4829
 
 
 
 
 
 
 
ef2c8e0
7cc4829
 
dbe8a71
7cc4829
d060ce1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import os
import numpy as np
from flask import Flask, request, jsonify, send_file, send_from_directory
import google.generativeai as genai
from gtts import gTTS, lang
import tempfile
import soundfile as sf
from kokoro import KPipeline
from werkzeug.utils import secure_filename
from flask_cors import CORS

app = Flask(__name__, static_folder='static')
CORS(app)
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024  # 50MB limit

# Configure Gemini API
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
    raise ValueError("GEMINI_API_KEY environment variable not set")
genai.configure(api_key=GEMINI_API_KEY)

# Language configurations
KOKORO_LANGUAGES = {
    "American English": "a",
    "British English": "b",
    "Mandarin Chinese": "z",
    "Spanish": "e",
    "French": "f",
    "Hindi": "h",
    "Italian": "i",
    "Brazilian Portuguese": "p"
}

GTTS_LANGUAGES = lang.tts_langs()
GTTS_LANGUAGES['ja'] = 'Japanese'

SUPPORTED_LANGUAGES = sorted(list(set(list(KOKORO_LANGUAGES.keys()) + list(GTTS_LANGUAGES.values()))))

def upload_large_file(file_path):
    """Handle large file uploads with chunking"""
    try:
        return genai.upload_file(path=file_path)
    except Exception as e:
        if "payload size exceeds" in str(e).lower():
            # Chunking strategy for large files
            chunk_size = 20 * 1024 * 1024  # 20MB chunks
            file_parts = []
            
            with open(file_path, 'rb') as f:
                i = 0
                while chunk := f.read(chunk_size):
                    part_path = f"{file_path}_part{i}"
                    with open(part_path, 'wb') as part_file:
                        part_file.write(chunk)
                    part = genai.upload_file(path=part_path)
                    file_parts.append(part)
                    os.remove(part_path)
                    i += 1
            
            return file_parts
        raise

@app.route('/translate', methods=['POST'])
def translate_audio():
    temp_input_path = None
    uploaded_file = None
    
    try:
        if 'audio' not in request.files:
            return jsonify({'error': 'No audio file uploaded'}), 400

        audio_file = request.files['audio']
        target_language = request.form.get('language', 'English')

        if not audio_file or audio_file.filename == '':
            return jsonify({'error': 'Invalid audio file'}), 400

        # Save to temp file
        temp_input_path = os.path.join(tempfile.gettempdir(), secure_filename(audio_file.filename))
        audio_file.save(temp_input_path)

        # Upload using File API
        uploaded_file = upload_large_file(temp_input_path)

        # Get transcription
        model = genai.GenerativeModel("gemini-2.0-flash-lite")
        
        if isinstance(uploaded_file, list):
            # Handle chunked files
            transcripts = []
            for chunk in uploaded_file:
                response = model.generate_content(["Transcribe this audio chunk:", chunk])
                transcripts.append(response.text)
                chunk.delete()  # Clean up each chunk
            transcription = " ".join(transcripts)
        else:
            response = model.generate_content(["Transcribe this audio file:", uploaded_file])
            transcription = response.text

        # Clean up main file
        if uploaded_file and not isinstance(uploaded_file, list):
            uploaded_file.delete()

        # Translate text using Gemini
        prompt = f"Translate the following text to {target_language} preserving meaning and cultural nuances. Respond only with the translation:\n\n{transcription}"
        response = model.generate_content(prompt)
        translated_text = response.text.strip()
        
        # Generate TTS
        if target_language in KOKORO_LANGUAGES:
            lang_code = KOKORO_LANGUAGES[target_language]
            pipeline = KPipeline(lang_code=lang_code)
            generator = pipeline(translated_text, voice="af_heart", speed=1)
            
            # Collect all audio segments
            audio_segments = []
            for _, _, audio in generator:
                if audio is not None:
                    audio_segments.append(audio)
            
            if audio_segments:
                audio_data = np.concatenate(audio_segments)
                _, temp_output_path = tempfile.mkstemp(suffix=".wav")
                sf.write(temp_output_path, audio_data, 24000)
            else:
                raise ValueError("No audio generated by Kokoro")
        else:
            # Standard gTTS handling
            lang_code = next((k for k, v in GTTS_LANGUAGES.items() if v == target_language), 'en')
            tts = gTTS(translated_text, lang=lang_code)
            _, temp_output_path = tempfile.mkstemp(suffix=".mp3")
            tts.save(temp_output_path)
        
        return jsonify({
            'transcription': transcription,
            'translation': translated_text,
            'audio_url': f'/download/{os.path.basename(temp_output_path)}'
        })
        
    except Exception as e:
        # Cleanup resources on error
        if uploaded_file:
            if isinstance(uploaded_file, list):
                for f in uploaded_file:
                    f.delete()
            else:
                uploaded_file.delete()
        if temp_input_path and os.path.exists(temp_input_path):
            os.remove(temp_input_path)
        app.logger.error(f"Error processing request: {str(e)}")
        return jsonify({'error': str(e)}), 500
    finally:
        if temp_input_path and os.path.exists(temp_input_path):
            os.remove(temp_input_path)

@app.route('/download/<filename>')
def download_file(filename):
    try:
        return send_file(
            os.path.join(tempfile.gettempdir(), filename),
            mimetype="audio/mpeg",
            as_attachment=True,
            download_name=f"translated_{filename}"
        )
    except FileNotFoundError:
        return jsonify({'error': 'File not found'}), 404

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=7820)