File size: 4,555 Bytes
bd97be7
9fe4dba
b40af2a
d07525d
e88a1f3
 
82b4010
009d251
82b4010
 
 
 
 
bd97be7
e88a1f3
8057378
 
e88a1f3
8057378
e88a1f3
 
8057378
82b4010
8057378
190e895
82b4010
 
190e895
 
 
875dc71
 
 
190e895
875dc71
190e895
 
db3a36a
190e895
8057378
82b4010
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8057378
525ee37
 
5f36451
d88ec40
525ee37
 
d88ec40
525ee37
 
 
e88a1f3
525ee37
82b4010
 
 
 
 
 
 
 
 
 
 
 
 
 
525ee37
82b4010
 
 
 
 
525ee37
db3a36a
82b4010
781e9f1
525ee37
 
 
190e895
e22e17f
525ee37
 
 
 
82b4010
b40af2a
525ee37
 
 
82b4010
781e9f1
 
525ee37
6befe57
525ee37
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import gradio as gr
import torch
from transformers import pipeline
import librosa
import soundfile as sf
import os
import uuid
import spaces

# Directory to save recorded audio files
OUTPUT_DIR = os.getenv("HF_HOME", ".")  # Use dynamic path or default to current directory
OUTPUT_DIR = os.path.join(OUTPUT_DIR, "recorded_audio_files")
os.makedirs(OUTPUT_DIR, exist_ok=True)

def split_audio(audio_data, sr, chunk_duration=30):
    """Split audio into chunks of chunk_duration seconds."""
    chunks = []
    for start in range(0, len(audio_data), int(chunk_duration * sr)):
        end = start + int(chunk_duration * sr)
        chunks.append(audio_data[start:end])
    return chunks

def transcribe_long_audio(audio_path, transcriber, chunk_duration=30):
    """Transcribe long audio by splitting into smaller chunks."""
    try:
        # Load the audio file
        audio_data, sr = librosa.load(audio_path, sr=None)
        chunks = split_audio(audio_data, sr, chunk_duration)
        transcriptions = []
        for i, chunk in enumerate(chunks):
            chunk_path = f"temp_chunk_{i}.wav"
            sf.write(chunk_path, chunk, sr)  # Save chunk as WAV
            transcription = transcriber(chunk_path)["text"]
            transcriptions.append(transcription)
            os.remove(chunk_path)  # Cleanup temp files
        return " ".join(transcriptions)
    except Exception as e:
        print(f"Error in transcribe_long_audio: {e}")
        return f"Error processing audio: {e}"

def cleanup_output_dir(max_storage_mb=500):
    """Remove old files if total directory size exceeds max_storage_mb."""
    try:
        total_size = sum(
            os.path.getsize(os.path.join(OUTPUT_DIR, f)) for f in os.listdir(OUTPUT_DIR)
        )
        if total_size > max_storage_mb * 1024 * 1024:
            files = sorted(
                (os.path.join(OUTPUT_DIR, f) for f in os.listdir(OUTPUT_DIR)),
                key=os.path.getctime,
            )
            for file in files:
                os.remove(file)
                total_size -= os.path.getsize(file)
                if total_size <= max_storage_mb * 1024 * 1024:
                    break
    except Exception as e:
        print(f"Error during cleanup: {e}")

@spaces.GPU(duration=3)
def main():
    device = 0 if torch.cuda.is_available() else -1

    try:
        transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=device)
        summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
    except Exception as e:
        print(f"Error loading models: {e}")
        raise

    def process_audio(audio_input):
        try:
            if isinstance(audio_input, tuple):  # Recorded audio
                print("Handling recorded audio.")
                audio_data, sr = audio_input
                filename = f"recorded_audio_{uuid.uuid4().hex}.wav"
                temp_path = os.path.join(OUTPUT_DIR, filename)
                sf.write(temp_path, audio_data, sr)
            elif isinstance(audio_input, str):  # Uploaded file path
                print("Handling uploaded audio.")
                temp_path = audio_input
            else:
                raise ValueError("Unsupported audio input format.")

            # Transcribe the saved audio file
            transcription = transcribe_long_audio(temp_path, transcriber, chunk_duration=30)
            summary = summarizer(transcription, max_length=50, min_length=10, do_sample=False)[0]["summary_text"]

            # Cleanup old files
            cleanup_output_dir()

            return transcription, summary, temp_path
        except Exception as e:
            print(f"Error in process_audio: {e}")
            return f"Error processing audio: {e}", "", ""

    with gr.Blocks() as interface:
        with gr.Row():
            with gr.Column():
                # Enable recording or file upload
                audio_input = gr.Audio(type="numpy", label="Record or Upload Audio")
                process_button = gr.Button("Process Audio")
            with gr.Column():
                transcription_output = gr.Textbox(label="Full Transcription", lines=10)
                summary_output = gr.Textbox(label="Summary", lines=5)
                audio_output = gr.Audio(label="Playback Processed Audio")

        process_button.click(
            process_audio,
            inputs=[audio_input],
            outputs=[transcription_output, summary_output, audio_output]
        )

    interface.launch(share=True)

if __name__ == "__main__":
    main()