import base64 import io import os import threading import tempfile import logging import openai from dash import Dash, dcc, html, Input, Output, State, callback, callback_context import dash_bootstrap_components as dbc from pydub import AudioSegment import requests import mimetypes import urllib.parse import subprocess import json from tqdm import tqdm # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Try to import moviepy with the simpler import statement try: from moviepy import VideoFileClip, AudioFileClip logger.info("MoviePy (VideoFileClip) successfully imported") except ImportError as e: logger.error(f"Error importing MoviePy (VideoFileClip): {str(e)}") logger.error("Please ensure moviepy is installed correctly") raise # Supported file formats AUDIO_FORMATS = ['.wav', '.mp3', '.ogg', '.flac', '.aac', '.m4a', '.wma'] VIDEO_FORMATS = ['.mp4', '.avi', '.mov', '.flv', '.wmv', '.mkv', '.webm'] SUPPORTED_FORMATS = AUDIO_FORMATS + VIDEO_FORMATS # Initialize the Dash app app = Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) # Global variables generated_file = None transcription_text = "" # Set up OpenAI API key openai.api_key = os.getenv("OPENAI_API_KEY") # Layout app.layout = dbc.Container([ html.H1("Audio/Video Transcription and Diarization App", className="text-center my-4"), dbc.Card([ dbc.CardBody([ dcc.Upload( id='upload-media', children=html.Div([ 'Drag and Drop or ', html.A('Select Audio/Video File') ]), style={ 'width': '100%', 'height': '60px', 'lineHeight': '60px', 'borderWidth': '1px', 'borderStyle': 'dashed', 'borderRadius': '5px', 'textAlign': 'center', 'margin': '10px' }, multiple=False ), html.Div(id='output-media-upload'), dbc.Input(id="url-input", type="text", placeholder="Enter audio/video URL", className="mb-3"), dbc.Button("Process Media", id="process-url-button", color="primary", className="mb-3"), dbc.Spinner(html.Div(id='transcription-status'), color="primary", type="grow"), html.H4("Diarized Transcription Preview", className="mt-4"), html.Div(id='transcription-preview', style={'whiteSpace': 'pre-wrap'}), html.Br(), dbc.Button("Download Transcription", id="btn-download", color="primary", className="mt-3", disabled=True), dcc.Download(id="download-transcription") ]) ]) ], fluid=True) def chunk_audio(audio_segment, chunk_size_ms=60000): chunks = [] for i in range(0, len(audio_segment), chunk_size_ms): chunks.append(audio_segment[i:i+chunk_size_ms]) return chunks def transcribe_audio_chunks(chunks): transcriptions = [] for i, chunk in enumerate(chunks): logger.info(f"Transcribing chunk {i+1}/{len(chunks)}") with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_audio_file: chunk.export(temp_audio_file.name, format="wav") with open(temp_audio_file.name, 'rb') as audio_file: transcript = openai.Audio.transcribe("whisper-1", audio_file) transcriptions.append(transcript.get('text', '')) os.unlink(temp_audio_file.name) return ' '.join(transcriptions) def download_file(url): with requests.Session() as session: # First, send a GET request to get the final URL after redirects response = session.get(url, allow_redirects=True, stream=True) final_url = response.url logger.info(f"Final URL after redirects: {final_url}") # Get the total file size total_size = int(response.headers.get('content-length', 0)) # Use a default name with .mp4 extension filename = 'downloaded_video.mp4' # Save the content to a temporary file with .mp4 extension with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_file: progress_bar = tqdm(total=total_size, unit='iB', unit_scale=True, desc=filename) for chunk in response.iter_content(chunk_size=8192): size = temp_file.write(chunk) progress_bar.update(size) progress_bar.close() temp_file_path = temp_file.name # Check if the downloaded file size matches the expected size actual_size = os.path.getsize(temp_file_path) if total_size != 0 and actual_size != total_size: logger.error(f"Downloaded file size ({actual_size} bytes) does not match expected size ({total_size} bytes)") raise Exception(f"Incomplete download. Expected {total_size} bytes, got {actual_size} bytes.") logger.info(f"File downloaded and saved as: {temp_file_path}") logger.info(f"File size: {actual_size} bytes") return temp_file_path def get_file_info(file_path): try: result = subprocess.run(['ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', file_path], capture_output=True, text=True, check=True) return json.loads(result.stdout) except subprocess.CalledProcessError as e: logger.error(f"Error getting file info: {str(e)}") return None def process_media(file_path, is_url=False): global generated_file, transcription_text temp_file = None wav_path = None try: if is_url: logger.info(f"Processing URL: {file_path}") try: temp_file = download_file(file_path) file_size = os.path.getsize(temp_file) logger.info(f"URL content downloaded: {temp_file} (Size: {file_size} bytes)") if file_size < 1000000: # Less than 1MB raise Exception(f"Downloaded file is too small ({file_size} bytes). Possible incomplete download.") except Exception as e: logger.error(f"Error downloading URL content: {str(e)}") return f"Error downloading URL content: {str(e)}", False # For downloaded files, we know it's an MP4, so we can skip file type determination is_video = True is_audio = False else: # For uploaded files, we still need to determine the file type logger.info("Processing uploaded file") content_type, content_string = file_path.split(',') decoded = base64.b64decode(content_string) temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.tmp') temp_file.write(decoded) temp_file.close() temp_file = temp_file.name logger.info(f"Uploaded file saved: {temp_file}") # Get file info for uploaded files file_info = get_file_info(temp_file) if not file_info: return "Unable to process file: Could not determine file type", False logger.info(f"File info: {json.dumps(file_info, indent=2)}") # Determine if it's audio or video is_audio = any(stream['codec_type'] == 'audio' for stream in file_info['streams']) is_video = any(stream['codec_type'] == 'video' for stream in file_info['streams']) # Convert to WAV using ffmpeg wav_path = tempfile.NamedTemporaryFile(delete=False, suffix='.wav').name try: if is_video: # Extract audio from video cmd = ['ffmpeg', '-y', '-i', temp_file, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', wav_path, '-v', 'verbose'] elif is_audio: # Convert audio to WAV cmd = ['ffmpeg', '-y', '-i', temp_file, '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', wav_path, '-v', 'verbose'] else: return "Unsupported file type: Neither audio nor video detected", False result = subprocess.run(cmd, check=True, capture_output=True, text=True) logger.info(f"FFmpeg command output: {result.stdout}") logger.info(f"Audio extracted to WAV: {wav_path}") except subprocess.CalledProcessError as e: logger.error(f"FFmpeg conversion failed. Error output: {e.stderr}") logger.error(f"FFmpeg command: {e.cmd}") logger.error(f"Return code: {e.returncode}") return f"FFmpeg conversion failed: {e.stderr}", False # Chunk the audio file audio = AudioSegment.from_wav(wav_path) chunks = chunk_audio(audio) logger.info(f"Audio chunked into {len(chunks)} segments") # Transcribe chunks transcription = transcribe_audio_chunks(chunks) logger.info(f"Transcription completed. Total length: {len(transcription)} characters") # Diarization using OpenAI diarization_prompt = f""" The following is a transcription of a conversation. Please identify different speakers and label them as Speaker 1, Speaker 2, etc. Format the output as a series of speaker labels followed by their dialogue. Here's the transcription: {transcription} Please analyze the content and speaking styles to differentiate between speakers. Consider changes in topic, speaking patterns, and any contextual clues that might indicate a change in speaker. """ diarization_response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are an AI assistant skilled in analyzing conversations and identifying different speakers."}, {"role": "user", "content": diarization_prompt} ] ) formatted_transcript = diarization_response['choices'][0]['message']['content'] transcription_text = formatted_transcript generated_file = io.BytesIO(transcription_text.encode()) logger.info("Transcription and diarization completed successfully") return "Transcription and diarization completed successfully!", True except Exception as e: logger.error(f"Error during processing: {str(e)}") return f"An error occurred: {str(e)}", False finally: if temp_file and os.path.exists(temp_file): os.unlink(temp_file) if wav_path and os.path.exists(wav_path): os.unlink(wav_path) @app.callback( [Output('output-media-upload', 'children'), Output('transcription-status', 'children'), Output('transcription-preview', 'children'), Output('btn-download', 'disabled')], [Input('upload-media', 'contents'), Input('process-url-button', 'n_clicks')], [State('upload-media', 'filename'), State('url-input', 'value')] ) def update_output(contents, n_clicks, filename, url): ctx = callback_context if not ctx.triggered: return "No file uploaded or URL processed.", "", "", True # Clear the preview pane transcription_preview = "" if contents is not None: status_message, success = process_media(contents) elif url: status_message, success = process_media(url, is_url=True) else: return "No file uploaded or URL processed.", "", "", True if success: preview = transcription_text[:1000] + "..." if len(transcription_text) > 1000 else transcription_text return f"Media processed successfully.", status_message, preview, False else: return "Processing failed.", status_message, transcription_preview, True @app.callback( Output("download-transcription", "data"), Input("btn-download", "n_clicks"), prevent_initial_call=True, ) def download_transcription(n_clicks): if n_clicks is None: return None return dcc.send_bytes(generated_file.getvalue(), "diarized_transcription.txt") if __name__ == '__main__': print("Starting the Dash application...") app.run(debug=True, host='0.0.0.0', port=7860) print("Dash application has finished running.")