bluenevus's picture
Update app.py
b3174ad verified
raw
history blame
6.78 kB
import base64
import io
import os
import threading
from dash import Dash, dcc, html, Input, Output, State, callback
import dash_bootstrap_components as dbc
import tempfile
import logging
import openai
from pydub import AudioSegment
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize the Dash app
app = Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])
# Global variables
generated_file = None
transcription_text = ""
# Set up OpenAI API key
openai.api_key = os.getenv("OPENAI_API_KEY")
# Layout
app.layout = dbc.Container([
html.H1("Audio Transcription and Diarization App", className="text-center my-4"),
dbc.Row([
# Left card for input
dbc.Col([
dbc.Card([
dbc.CardBody([
dcc.Upload(
id='upload-audio',
children=html.Div([
'Drag and Drop or ',
html.A('Select Audio File')
]),
style={
'width': '100%',
'height': '60px',
'lineHeight': '60px',
'borderWidth': '1px',
'borderStyle': 'dashed',
'borderRadius': '5px',
'textAlign': 'center',
'margin': '10px'
},
multiple=False
),
html.Div(id='output-audio-upload'),
dbc.Spinner(html.Div(id='transcription-status'), color="primary", type="grow"),
])
], className="mb-4")
], md=6),
# Right card for output
dbc.Col([
dbc.Card([
dbc.CardBody([
html.H4("Diarized Transcription Preview", className="card-title"),
html.Div(id='transcription-preview', style={'whiteSpace': 'pre-wrap'}),
html.Br(),
dbc.Button("Download Transcription", id="btn-download", color="primary", className="mt-3", disabled=True),
dcc.Download(id="download-transcription")
])
])
], md=6)
])
], fluid=True)
def transcribe_and_diarize_audio(contents, filename):
global generated_file, transcription_text
temp_audio_file = None
wav_path = None
try:
content_type, content_string = contents.split(',')
decoded = base64.b64decode(content_string)
# Create a temporary file that won't be immediately deleted
temp_audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1])
temp_audio_file.write(decoded)
temp_audio_file.close() # Close the file but don't delete it yet
temp_audio_file_path = temp_audio_file.name
logger.info(f"File uploaded: {temp_audio_file_path}")
if filename.lower().endswith(('.wav', '.mp3', '.ogg', '.flac')):
logger.info("Audio file detected, transcribing with OpenAI")
# Convert audio to wav format if needed
audio = AudioSegment.from_file(temp_audio_file_path)
wav_path = temp_audio_file_path + ".wav"
audio.export(wav_path, format="wav")
with open(wav_path, "rb") as audio_file:
# Transcribe
transcript = openai.Audio.transcribe("whisper-1", audio_file)
# Rewind the file for diarization
audio_file.seek(0)
# Perform diarization (speaker detection)
diarized_transcript = openai.Audio.transcribe("whisper-1", audio_file, response_format="verbose_json")
logger.info(f"OpenAI API Response: {diarized_transcript}")
# Format the diarized transcript
formatted_transcript = ""
if 'segments' in diarized_transcript:
for segment in diarized_transcript["segments"]:
speaker = segment.get('speaker', 'Unknown')
text = segment.get('text', '')
formatted_transcript += f"Speaker {speaker}: {text}\n\n"
else:
# If no segments, use the full transcript
formatted_transcript = transcript.get('text', 'No transcription available.')
transcription_text = formatted_transcript
logger.info("Transcription and diarization completed successfully")
# Prepare the transcription for download
generated_file = io.BytesIO(transcription_text.encode())
return "Transcription and diarization completed successfully!", True
else:
return "Unsupported file format. Please upload an audio file.", False
except Exception as e:
logger.error(f"Error during transcription and diarization: {str(e)}")
return f"An error occurred during transcription and diarization: {str(e)}", False
finally:
# Clean up temporary files
if temp_audio_file and os.path.exists(temp_audio_file.name):
os.unlink(temp_audio_file.name)
if wav_path and os.path.exists(wav_path):
os.unlink(wav_path)
@app.callback(
[Output('output-audio-upload', 'children'),
Output('transcription-status', 'children'),
Output('transcription-preview', 'children'),
Output('btn-download', 'disabled')],
[Input('upload-audio', 'contents')],
[State('upload-audio', 'filename')]
)
def update_output(contents, filename):
if contents is None:
return "No file uploaded.", "", "", True
status_message, success = transcribe_and_diarize_audio(contents, filename)
if success:
preview = transcription_text[:1000] + "..." if len(transcription_text) > 1000 else transcription_text
return f"File {filename} processed successfully.", status_message, preview, False
else:
return f"File {filename} could not be processed.", status_message, "", True
@app.callback(
Output("download-transcription", "data"),
Input("btn-download", "n_clicks"),
prevent_initial_call=True,
)
def download_transcription(n_clicks):
if n_clicks is None:
return None
return dcc.send_bytes(generated_file.getvalue(), "diarized_transcription.txt")
if __name__ == '__main__':
print("Starting the Dash application...")
app.run(debug=True, host='0.0.0.0', port=7860)
print("Dash application has finished running.")