import base64 import io import os import threading import tempfile import logging import openai from dash import Dash, dcc, html, Input, Output, State, callback import dash_bootstrap_components as dbc from pydub import AudioSegment import requests from pytube import YouTube import moviepy.editor as mp # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Initialize the Dash app app = Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) # Global variables generated_file = None transcription_text = "" # Set up OpenAI API key openai.api_key = os.getenv("OPENAI_API_KEY") # Layout app.layout = dbc.Container([ html.H1("Audio/Video Transcription and Diarization App", className="text-center my-4"), dbc.Card([ dbc.CardBody([ dcc.Upload( id='upload-media', children=html.Div([ 'Drag and Drop or ', html.A('Select Audio/Video File') ]), style={ 'width': '100%', 'height': '60px', 'lineHeight': '60px', 'borderWidth': '1px', 'borderStyle': 'dashed', 'borderRadius': '5px', 'textAlign': 'center', 'margin': '10px' }, multiple=False ), html.Div(id='output-media-upload'), dbc.Input(id="url-input", type="text", placeholder="Enter audio/video URL (including YouTube)", className="mb-3"), dbc.Button("Process URL", id="process-url-button", color="primary", className="mb-3"), dbc.Spinner(html.Div(id='transcription-status'), color="primary", type="grow"), html.H4("Diarized Transcription Preview", className="mt-4"), html.Div(id='transcription-preview', style={'whiteSpace': 'pre-wrap'}), html.Br(), dbc.Button("Download Transcription", id="btn-download", color="primary", className="mt-3", disabled=True), dcc.Download(id="download-transcription") ]) ]) ], fluid=True) def process_media(file_path, is_url=False): global generated_file, transcription_text temp_audio_file = None try: if is_url: if 'youtube.com' in file_path or 'youtu.be' in file_path: yt = YouTube(file_path) stream = yt.streams.filter(only_audio=True).first() temp_audio_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') stream.download(output_path=os.path.dirname(temp_audio_file.name), filename=os.path.basename(temp_audio_file.name)) else: response = requests.get(file_path) temp_audio_file = tempfile.NamedTemporaryFile(delete=False) temp_audio_file.write(response.content) temp_audio_file.close() else: temp_audio_file = tempfile.NamedTemporaryFile(delete=False) temp_audio_file.write(file_path) temp_audio_file.close() file_extension = os.path.splitext(temp_audio_file.name)[1].lower() if file_extension in ['.mp4', '.avi', '.mov', '.flv', '.wmv']: video = mp.VideoFileClip(temp_audio_file.name) audio = video.audio wav_path = temp_audio_file.name + ".wav" audio.write_audiofile(wav_path) video.close() elif file_extension in ['.wav', '.mp3', '.ogg', '.flac']: audio = AudioSegment.from_file(temp_audio_file.name) wav_path = temp_audio_file.name + ".wav" audio.export(wav_path, format="wav") else: return "Unsupported file format. Please upload an audio or video file.", False with open(wav_path, "rb") as audio_file: transcript = openai.Audio.transcribe("whisper-1", audio_file) audio_file.seek(0) diarized_transcript = openai.Audio.transcribe("whisper-1", audio_file, response_format="verbose_json") formatted_transcript = "" if 'segments' in diarized_transcript: for segment in diarized_transcript["segments"]: speaker = segment.get('speaker', 'Unknown') text = segment.get('text', '') formatted_transcript += f"Speaker {speaker}: {text}\n\n" else: formatted_transcript = transcript.get('text', 'No transcription available.') transcription_text = formatted_transcript generated_file = io.BytesIO(transcription_text.encode()) return "Transcription and diarization completed successfully!", True except Exception as e: logger.error(f"Error during processing: {str(e)}") return f"An error occurred: {str(e)}", False finally: if temp_audio_file and os.path.exists(temp_audio_file.name): os.unlink(temp_audio_file.name) if 'wav_path' in locals() and os.path.exists(wav_path): os.unlink(wav_path) @app.callback( [Output('output-media-upload', 'children'), Output('transcription-status', 'children'), Output('transcription-preview', 'children'), Output('btn-download', 'disabled')], [Input('upload-media', 'contents'), Input('process-url-button', 'n_clicks')], [State('upload-media', 'filename'), State('url-input', 'value')] ) def update_output(contents, n_clicks, filename, url): ctx = callback_context if not ctx.triggered: return "No file uploaded or URL processed.", "", "", True trigger_id = ctx.triggered[0]['prop_id'].split('.')[0] if trigger_id == 'upload-media' and contents is not None: content_type, content_string = contents.split(',') decoded = base64.b64decode(content_string) status_message, success = process_media(decoded) elif trigger_id == 'process-url-button' and url: status_message, success = process_media(url, is_url=True) else: return "No file uploaded or URL processed.", "", "", True if success: preview = transcription_text[:1000] + "..." if len(transcription_text) > 1000 else transcription_text return f"File processed successfully.", status_message, preview, False else: return "Processing failed.", status_message, "", True @app.callback( Output("download-transcription", "data"), Input("btn-download", "n_clicks"), prevent_initial_call=True, ) def download_transcription(n_clicks): if n_clicks is None: return None return dcc.send_bytes(generated_file.getvalue(), "diarized_transcription.txt") if __name__ == '__main__': print("Starting the Dash application...") app.run(debug=True, host='0.0.0.0', port=7860) print("Dash application has finished running.")