import io import os import tempfile import threading import base64 from urllib.parse import urlparse import dash from dash import dcc, html, Input, Output, State import dash_bootstrap_components as dbc from dash.exceptions import PreventUpdate import requests from pytube import YouTube from pydub import AudioSegment import openai # Initialize the Dash app app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) # Retrieve the OpenAI API key from Hugging Face Spaces openai.api_key = os.environ.get("OPENAI_API_KEY") def is_valid_url(url): try: result = urlparse(url) return all([result.scheme, result.netloc]) except ValueError: return False def download_audio(url): if "youtube.com" in url or "youtu.be" in url: yt = YouTube(url) audio_stream = yt.streams.filter(only_audio=True).first() with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: audio_stream.download(output_path=os.path.dirname(temp_file.name), filename=temp_file.name) return temp_file.name else: response = requests.get(url) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file: temp_file.write(response.content) return temp_file.name def transcribe_audio(file_path): with open(file_path, "rb") as audio_file: transcript = openai.Audio.transcribe("whisper-1", audio_file) return transcript["text"] def process_audio(contents, filename, url): if contents: content_type, content_string = contents.split(',') decoded = base64.b64decode(content_string) with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file: temp_file.write(decoded) temp_file_path = temp_file.name elif url: temp_file_path = download_audio(url) else: raise ValueError("No input provided") try: transcript = transcribe_audio(temp_file_path) finally: os.unlink(temp_file_path) return transcript app.layout = dbc.Container([ html.H1("Audio Transcription App", className="text-center my-4"), dbc.Card([ dbc.CardBody([ dcc.Upload( id='upload-audio', children=html.Div([ 'Drag and Drop or ', html.A('Select Audio File') ]), style={ 'width': '100%', 'height': '60px', 'lineHeight': '60px', 'borderWidth': '1px', 'borderStyle': 'dashed', 'borderRadius': '5px', 'textAlign': 'center', 'margin': '10px' }, multiple=False ), dbc.Input(id="audio-url", type="text", placeholder="Enter audio URL or YouTube link", className="my-3"), dbc.Button("Transcribe", id="transcribe-button", color="primary", className="w-100 mb-3"), dbc.Spinner(html.Div(id="transcription-output", className="mt-3")), dbc.Button("Download Transcript", id="download-button", color="secondary", className="w-100 mt-3", style={'display': 'none'}), dcc.Download(id="download-transcript") ]) ]) ]) @app.callback( Output("transcription-output", "children"), Output("download-button", "style"), Input("transcribe-button", "n_clicks"), State("upload-audio", "contents"), State("upload-audio", "filename"), State("audio-url", "value"), prevent_initial_call=True ) def update_transcription(n_clicks, contents, filename, url): if not contents and not url: raise PreventUpdate def transcribe(): try: return process_audio(contents, filename, url) except Exception as e: return f"An error occurred: {str(e)}" thread = threading.Thread(target=transcribe) thread.start() thread.join(timeout=600) # 10 minutes timeout if thread.is_alive(): return "Transcription timed out after 10 minutes", {'display': 'none'} transcript = getattr(thread, 'result', "Transcription failed") if transcript and not transcript.startswith("An error occurred"): return dbc.Card([ dbc.CardBody([ html.H5("Transcription Result"), html.Pre(transcript, style={"white-space": "pre-wrap", "word-wrap": "break-word"}) ]) ]), {'display': 'block'} else: return transcript, {'display': 'none'} @app.callback( Output("download-transcript", "data"), Input("download-button", "n_clicks"), State("transcription-output", "children"), prevent_initial_call=True ) def download_transcript(n_clicks, transcription_output): if not transcription_output: raise PreventUpdate transcript = transcription_output['props']['children'][0]['props']['children'][1]['props']['children'] return dict(content=transcript, filename="transcript.txt") if __name__ == '__main__': print("Starting the Dash application...") app.run(debug=True, host='0.0.0.0', port=7860) print("Dash application has finished running.")