|
import base64 |
|
import io |
|
import os |
|
import threading |
|
from dash import Dash, dcc, html, Input, Output, State, callback |
|
import dash_bootstrap_components as dbc |
|
import tempfile |
|
import logging |
|
import openai |
|
from pydub import AudioSegment |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) |
|
|
|
|
|
generated_file = None |
|
transcription_text = "" |
|
|
|
|
|
openai.api_key = os.getenv("OPENAI_API_KEY") |
|
|
|
|
|
app.layout = dbc.Container([ |
|
html.H1("Audio Transcription and Diarization App", className="text-center my-4"), |
|
dbc.Row([ |
|
|
|
dbc.Col([ |
|
dbc.Card([ |
|
dbc.CardBody([ |
|
dcc.Upload( |
|
id='upload-audio', |
|
children=html.Div([ |
|
'Drag and Drop or ', |
|
html.A('Select Audio File') |
|
]), |
|
style={ |
|
'width': '100%', |
|
'height': '60px', |
|
'lineHeight': '60px', |
|
'borderWidth': '1px', |
|
'borderStyle': 'dashed', |
|
'borderRadius': '5px', |
|
'textAlign': 'center', |
|
'margin': '10px' |
|
}, |
|
multiple=False |
|
), |
|
html.Div(id='output-audio-upload'), |
|
dbc.Spinner(html.Div(id='transcription-status'), color="primary", type="grow"), |
|
]) |
|
], className="mb-4") |
|
], md=6), |
|
|
|
dbc.Col([ |
|
dbc.Card([ |
|
dbc.CardBody([ |
|
html.H4("Diarized Transcription Preview", className="card-title"), |
|
html.Div(id='transcription-preview', style={'whiteSpace': 'pre-wrap'}), |
|
html.Br(), |
|
dbc.Button("Download Transcription", id="btn-download", color="primary", className="mt-3", disabled=True), |
|
dcc.Download(id="download-transcription") |
|
]) |
|
]) |
|
], md=6) |
|
]) |
|
], fluid=True) |
|
|
|
def transcribe_and_diarize_audio(contents, filename): |
|
global generated_file, transcription_text |
|
temp_audio_file = None |
|
wav_path = None |
|
try: |
|
content_type, content_string = contents.split(',') |
|
decoded = base64.b64decode(content_string) |
|
|
|
|
|
temp_audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) |
|
temp_audio_file.write(decoded) |
|
temp_audio_file.close() |
|
temp_audio_file_path = temp_audio_file.name |
|
|
|
logger.info(f"File uploaded: {temp_audio_file_path}") |
|
|
|
if filename.lower().endswith(('.wav', '.mp3', '.ogg', '.flac')): |
|
logger.info("Audio file detected, transcribing with OpenAI") |
|
|
|
|
|
audio = AudioSegment.from_file(temp_audio_file_path) |
|
wav_path = temp_audio_file_path + ".wav" |
|
audio.export(wav_path, format="wav") |
|
|
|
with open(wav_path, "rb") as audio_file: |
|
|
|
transcript = openai.Audio.transcribe("whisper-1", audio_file) |
|
|
|
|
|
audio_file.seek(0) |
|
|
|
|
|
diarized_transcript = openai.Audio.transcribe("whisper-1", audio_file, response_format="verbose_json") |
|
|
|
logger.info(f"OpenAI API Response: {diarized_transcript}") |
|
|
|
|
|
formatted_transcript = "" |
|
if 'segments' in diarized_transcript: |
|
for segment in diarized_transcript["segments"]: |
|
speaker = segment.get('speaker', 'Unknown') |
|
text = segment.get('text', '') |
|
formatted_transcript += f"Speaker {speaker}: {text}\n\n" |
|
else: |
|
|
|
formatted_transcript = transcript.get('text', 'No transcription available.') |
|
|
|
transcription_text = formatted_transcript |
|
logger.info("Transcription and diarization completed successfully") |
|
|
|
|
|
generated_file = io.BytesIO(transcription_text.encode()) |
|
return "Transcription and diarization completed successfully!", True |
|
else: |
|
return "Unsupported file format. Please upload an audio file.", False |
|
except Exception as e: |
|
logger.error(f"Error during transcription and diarization: {str(e)}") |
|
return f"An error occurred during transcription and diarization: {str(e)}", False |
|
finally: |
|
|
|
if temp_audio_file and os.path.exists(temp_audio_file.name): |
|
os.unlink(temp_audio_file.name) |
|
if wav_path and os.path.exists(wav_path): |
|
os.unlink(wav_path) |
|
|
|
@app.callback( |
|
[Output('output-audio-upload', 'children'), |
|
Output('transcription-status', 'children'), |
|
Output('transcription-preview', 'children'), |
|
Output('btn-download', 'disabled')], |
|
[Input('upload-audio', 'contents')], |
|
[State('upload-audio', 'filename')] |
|
) |
|
def update_output(contents, filename): |
|
if contents is None: |
|
return "No file uploaded.", "", "", True |
|
|
|
status_message, success = transcribe_and_diarize_audio(contents, filename) |
|
|
|
if success: |
|
preview = transcription_text[:1000] + "..." if len(transcription_text) > 1000 else transcription_text |
|
return f"File {filename} processed successfully.", status_message, preview, False |
|
else: |
|
return f"File {filename} could not be processed.", status_message, "", True |
|
|
|
@app.callback( |
|
Output("download-transcription", "data"), |
|
Input("btn-download", "n_clicks"), |
|
prevent_initial_call=True, |
|
) |
|
def download_transcription(n_clicks): |
|
if n_clicks is None: |
|
return None |
|
return dcc.send_bytes(generated_file.getvalue(), "diarized_transcription.txt") |
|
|
|
if __name__ == '__main__': |
|
print("Starting the Dash application...") |
|
app.run(debug=True, host='0.0.0.0', port=7860) |
|
print("Dash application has finished running.") |