import os import streamlit as st import tempfile import torch import transformers from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer import plotly.express as px import logging import warnings import whisper from pydub import AudioSegment import time import base64 import io import streamlit.components.v1 as components from concurrent.futures import ThreadPoolExecutor from typing import Dict, Tuple, List, Any, Optional, Union import numpy as np # Suppress warnings for a clean console logging.getLogger("torch").setLevel(logging.CRITICAL) logging.getLogger("transformers").setLevel(logging.CRITICAL) warnings.filterwarnings("ignore") os.environ["TOKENIZERS_PARALLELISM"] = "false" # Check if NumPy is available try: test_array = np.array([1, 2, 3]) torch.from_numpy(test_array) except Exception as e: st.error(f"NumPy is not available or incompatible with PyTorch: {str(e)}. Ensure 'numpy' is in requirements.txt and reinstall dependencies.") st.stop() # Check if CUDA is available, otherwise use CPU device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # Set Streamlit app layout st.set_page_config(layout="wide", page_title="Voice Based Sentiment Analysis") # Interface design st.title("🎙 Voice Based Sentiment Analysis") st.write("Detect emotions, sentiment, and sarcasm from your voice with fast and accurate processing.") # Emotion Detection Function with optimizations @st.cache_resource def get_emotion_classifier(): try: tokenizer = AutoTokenizer.from_pretrained("bhadresh-savani/distilbert-base-uncased-emotion", use_fast=True, model_max_length=512) model = AutoModelForSequenceClassification.from_pretrained("bhadresh-savani/distilbert-base-uncased-emotion") model = model.to(device) model.eval() classifier = pipeline("text-classification", model=model, tokenizer=tokenizer, return_all_scores=True, device=0 if torch.cuda.is_available() else -1) # Test the model test_result = classifier("I am happy today") print(f"Emotion classifier test: {test_result}") return classifier except Exception as e: print(f"Error loading emotion model: {str(e)}") st.error(f"Failed to load emotion model. Please check logs.") return None # Cache emotion results @st.cache_data(ttl=600) def perform_emotion_detection(text: str) -> Tuple[Dict[str, float], str, Dict[str, str], str]: try: if not text or len(text.strip()) < 3: return {}, "neutral", {"neutral": "😐"}, "NEUTRAL" emotion_classifier = get_emotion_classifier() if emotion_classifier is None: st.error("Emotion classifier not available.") return {}, "neutral", {"neutral": "😐"}, "NEUTRAL" # Process text directly (skip chunking for speed) emotion_results = emotion_classifier(text) emotion_map = { "joy": "😊", "anger": "😡", "disgust": "🤢", "fear": "😨", "sadness": "😭", "surprise": "😲", "neutral": "😐" } positive_emotions = ["joy"] negative_emotions = ["anger", "disgust", "fear", "sadness"] neutral_emotions = ["surprise", "neutral"] # Process results emotions_dict = {emotion['label']: emotion['score'] for emotion in emotion_results[0]} # Filter emotions with a lower threshold filtered_emotions = {k: v for k, v in emotions_dict.items() if v > 0.01} # Lowered from 0.05 if not filtered_emotions: filtered_emotions = emotions_dict # Check for mixed emotions sorted_emotions = sorted(filtered_emotions.items(), key=lambda x: x[1], reverse=True) if len(sorted_emotions) > 1 and sorted_emotions[1][1] > 0.8 * sorted_emotions[0][1]: top_emotion = "MIXED" else: top_emotion = sorted_emotions[0][0] # Determine sentiment if top_emotion == "MIXED": sentiment = "MIXED" elif top_emotion in positive_emotions: sentiment = "POSITIVE" elif top_emotion in negative_emotions: sentiment = "NEGATIVE" else: sentiment = "NEUTRAL" return emotions_dict, top_emotion, emotion_map, sentiment except Exception as e: st.error(f"Emotion detection failed: {str(e)}") print(f"Exception in emotion detection: {str(e)}") return {}, "neutral", {"neutral": "😐"}, "NEUTRAL" # Sarcasm Detection Function @st.cache_resource def get_sarcasm_classifier(): try: tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-irony", use_fast=True, model_max_length=512) model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-irony") model = model.to(device) model.eval() classifier = pipeline("text-classification", model=model, tokenizer=tokenizer, device=0 if torch.cuda.is_available() else -1) # Test the model test_result = classifier("This is totally amazing") print(f"Sarcasm classifier test: {test_result}") return classifier except Exception as e: print(f"Error loading sarcasm model: {str(e)}") st.error(f"Failed to load sarcasm model. Please check logs.") return None @st.cache_data(ttl=600) def perform_sarcasm_detection(text: str) -> Tuple[bool, float]: try: if not text or len(text.strip()) < 3: return False, 0.0 sarcasm_classifier = get_sarcasm_classifier() if sarcasm_classifier is None: st.error("Sarcasm classifier not available.") return False, 0.0 result = sarcasm_classifier(text)[0] is_sarcastic = result['label'] == "LABEL_1" sarcasm_score = result['score'] if is_sarcastic else 1 - result['score'] return is_sarcastic, sarcasm_score except Exception as e: st.error(f"Sarcasm detection failed: {str(e)}") return False, 0.0 # Validate audio quality (streamlined for speed) def validate_audio(audio_path: str) -> bool: try: sound = AudioSegment.from_file(audio_path) if len(sound) < 300: # Relaxed to 0.3s st.warning("Audio is very short. Longer audio provides better analysis.") return False return True except Exception as e: st.error(f"Invalid or corrupted audio file: {str(e)}") return False # Speech Recognition with Whisper @st.cache_resource def load_whisper_model(): try: model = whisper.load_model("base") # Fastest model for quick transcription return model except Exception as e: print(f"Error loading Whisper model: {str(e)}") st.error(f"Failed to load Whisper model. Please check logs.") return None @st.cache_data def transcribe_audio(audio_path: str) -> str: try: sound = AudioSegment.from_file(audio_path) # Convert to WAV format (16kHz, mono) for Whisper temp_wav_path = os.path.join(tempfile.gettempdir(), f"temp_converted_{int(time.time())}.wav") sound = sound.set_frame_rate(16000).set_channels(1) sound.export(temp_wav_path, format="wav") # Load model model = load_whisper_model() if model is None: return "" # Transcribe with optimized settings result = model.transcribe( temp_wav_path, language="en", task="transcribe", fp16=torch.cuda.is_available(), beam_size=3 # Reduced for speed ) main_text = result["text"].strip() # Clean up if os.path.exists(temp_wav_path): os.remove(temp_wav_path) return main_text except Exception as e: st.error(f"Transcription failed: {str(e)}") return "" # Process uploaded audio files def process_uploaded_audio(audio_file) -> Optional[str]: if not audio_file: return None try: temp_dir = tempfile.gettempdir() ext = audio_file.name.split('.')[-1].lower() if '.' in audio_file.name else '' if ext not in ['wav', 'mp3', 'ogg', 'm4a', 'flac']: st.error("Unsupported audio format. Please upload WAV, MP3, OGG, M4A, or FLAC.") return None temp_file_path = os.path.join(temp_dir, f"uploaded_audio_{int(time.time())}.{ext}") with open(temp_file_path, "wb") as f: f.write(audio_file.getvalue()) if not validate_audio(temp_file_path): st.warning("Audio may not be optimal, but we'll try to process it.") return temp_file_path except Exception as e: st.error(f"Error processing uploaded audio: {str(e)}") return None # Show model information def show_model_info(): st.sidebar.header("🧠 About the Models") model_tabs = st.sidebar.tabs(["Emotion", "Sarcasm", "Speech"]) with model_tabs[0]: st.markdown(""" *Emotion Model*: distilbert-base-uncased-emotion - Detects joy, anger, disgust, fear, sadness, surprise - Architecture: DistilBERT base [🔍 Model Hub](https://huggingface.co/bhadresh-savani/distilbert-base-uncased-emotion) """) with model_tabs[1]: st.markdown(""" *Sarcasm Model*: cardiffnlp/twitter-roberta-base-irony - Trained on Twitter irony dataset - Architecture: RoBERTa base [🔍 Model Hub](https://huggingface.co/cardiffnlp/twitter-roberta-base-irony) """) with model_tabs[2]: st.markdown(""" *Speech Recognition*: OpenAI Whisper (base model) - Optimized for speed - Handles varied accents *Tips*: Use good mic, reduce noise [🔍 Model Details](https://github.com/openai/whisper) """) # Custom audio recorder def custom_audio_recorder(): st.warning("Browser-based recording requires microphone access. If recording fails, try uploading an audio file.") audio_recorder_html = """
Ready to record
""" return components.html(audio_recorder_html, height=150) # Display analysis results def display_analysis_results(transcribed_text, emotions_dict, top_emotion, emotion_map, sentiment, is_sarcastic, sarcasm_score): st.session_state.debug_info = st.session_state.get('debug_info', []) st.session_state.debug_info.append(f"Text: {transcribed_text[:50]}...") st.session_state.debug_info.append(f"Top emotion: {top_emotion}, Sentiment: {sentiment}, Sarcasm: {is_sarcastic}") st.session_state.debug_info = st.session_state.debug_info[-100:] st.header("Transcribed Text") st.text_area("Text", transcribed_text, height=100, disabled=True) # Confidence estimation word_count = len(transcribed_text.split()) confidence_score = min(0.98, max(0.75, 0.75 + (word_count / 100) * 0.2)) st.caption(f"Estimated transcription confidence: {confidence_score:.2f}") st.header("Analysis Results") col1, col2 = st.columns([1, 2]) with col1: st.subheader("Sentiment") sentiment_icon = "👍" if sentiment == "POSITIVE" else "👎" if sentiment == "NEGATIVE" else "🔄" if sentiment == "MIXED" else "😐" st.markdown(f"**{sentiment_icon} {sentiment.capitalize()}** (Based on {top_emotion})") st.subheader("Sarcasm") sarcasm_icon = "😏" if is_sarcastic else "😐" sarcasm_text = "Detected" if is_sarcastic else "Not Detected" st.markdown(f"**{sarcasm_icon} {sarcasm_text}** (Score: {sarcasm_score:.3f})") with col2: st.subheader("Emotions") if emotions_dict: st.markdown(f"*Dominant:* {emotion_map.get(top_emotion, '❓')} {top_emotion.capitalize()} (Score: {emotions_dict[top_emotion]:.3f})") sorted_emotions = sorted(emotions_dict.items(), key=lambda x: x[1], reverse=True) significant_emotions = [(e, s) for e, s in sorted_emotions if s > 0.01] if significant_emotions: emotions = [e[0] for e in significant_emotions] scores = [e[1] for e in significant_emotions] fig = px.bar(x=emotions, y=scores, labels={'x': 'Emotion', 'y': 'Score'}, title="Emotion Distribution", color=emotions, color_discrete_sequence=px.colors.qualitative.Bold) fig.update_layout(yaxis_range=[0, 1], showlegend=False, title_font_size=14, margin=dict(l=20, r=20, t=40, b=20), bargap=0.3) st.plotly_chart(fig, use_container_width=True) else: st.write("No significant emotions detected.") else: st.write("No emotions detected.") # Debug expander with st.expander("Debug Information", expanded=False): st.write("Debugging information:") for i, debug_line in enumerate(st.session_state.debug_info[-10:]): st.text(f"{i + 1}. {debug_line}") if emotions_dict: st.write("Raw emotion scores:") for emotion, score in sorted(emotions_dict.items(), key=lambda x: x[1], reverse=True): if score > 0.01: st.text(f"{emotion}: {score:.4f}") # Process base64 audio data def process_base64_audio(base64_data): try: if not base64_data or not isinstance(base64_data, str) or not base64_data.startswith('data:'): st.error("Invalid audio data received") return None base64_binary = base64_data.split(',')[1] binary_data = base64.b64decode(base64_binary) temp_file_path = os.path.join(tempfile.gettempdir(), f"recording_{int(time.time())}.wav") with open(temp_file_path, "wb") as f: f.write(binary_data) if not validate_audio(temp_file_path): st.warning("Audio quality may not be optimal, but we'll try to process it.") return temp_file_path except Exception as e: st.error(f"Error processing audio data: {str(e)}") return None # Preload models in background def preload_models(): threading.Thread(target=load_whisper_model).start() threading.Thread(target=get_emotion_classifier).start() threading.Thread(target=get_sarcasm_classifier).start() # Main App Logic def main(): if 'debug_info' not in st.session_state: st.session_state.debug_info = [] if 'models_loaded' not in st.session_state: st.session_state.models_loaded = False if not st.session_state.models_loaded: preload_models() st.session_state.models_loaded = True tab1, tab2 = st.tabs(["📁 Upload Audio", "🎙 Record Audio"]) with tab1: st.header("Upload an Audio File") audio_file = st.file_uploader("Choose an audio file", type=["wav", "mp3", "ogg", "m4a", "flac"]) if audio_file: st.audio(audio_file.getvalue()) upload_button = st.button("Analyze Upload", key="analyze_upload") if upload_button: progress_bar = st.progress(0, text="Preparing audio...") temp_audio_path = process_uploaded_audio(audio_file) if temp_audio_path: progress_bar.progress(25, text="Processing in parallel...") with ThreadPoolExecutor(max_workers=3) as executor: transcribe_future = executor.submit(transcribe_audio, temp_audio_path) emotion_future = executor.submit(perform_emotion_detection, transcribe_future.result()) sarcasm_future = executor.submit(perform_sarcasm_detection, transcribe_future.result()) transcribed_text = transcribe_future.result() emotions_dict, top_emotion, emotion_map, sentiment = emotion_future.result() is_sarcastic, sarcasm_score = sarcasm_future.result() progress_bar.progress(90, text="Finalizing results...") if transcribed_text: display_analysis_results(transcribed_text, emotions_dict, top_emotion, emotion_map, sentiment, is_sarcastic, sarcasm_score) else: st.error("Could not transcribe the audio. Try clearer audio.") progress_bar.progress(100, text="Analysis complete!") if os.path.exists(temp_audio_path): os.remove(temp_audio_path) else: st.error("Could not process the audio file.") with tab2: st.header("Record Your Voice") audio_data = custom_audio_recorder() if audio_data: analyze_rec_button = st.button("Analyze Recording", key="analyze_rec") if analyze_rec_button: progress_bar = st.progress(0, text="Processing recording...") temp_audio_path = process_base64_audio(audio_data) if temp_audio_path: progress_bar.progress(30, text="Processing in parallel...") with ThreadPoolExecutor(max_workers=3) as executor: transcribe_future = executor.submit(transcribe_audio, temp_audio_path) emotion_future = executor.submit(perform_emotion_detection, transcribe_future.result()) sarcasm_future = executor.submit(perform_sarcasm_detection, transcribe_future.result()) transcribed_text = transcribe_future.result() emotions_dict, top_emotion, emotion_map, sentiment = emotion_future.result() is_sarcastic, sarcasm_score = sarcasm_future.result() progress_bar.progress(90, text="Finalizing results...") if transcribed_text: display_analysis_results(transcribed_text, emotions_dict, top_emotion, emotion_map, sentiment, is_sarcastic, sarcasm_score) else: st.error("Could not transcribe the audio. Speak clearly.") progress_bar.progress(100, text="Analysis complete!") if os.path.exists(temp_audio_path): os.remove(temp_audio_path) else: st.error("Could not process the recording.") st.subheader("Manual Text Input") manual_text = st.text_area("Enter text to analyze:", placeholder="Type text to analyze...") analyze_text_button = st.button("Analyze Text", key="analyze_manual") if analyze_text_button and manual_text: with ThreadPoolExecutor(max_workers=2) as executor: emotion_future = executor.submit(perform_emotion_detection, manual_text) sarcasm_future = executor.submit(perform_sarcasm_detection, manual_text) emotions_dict, top_emotion, emotion_map, sentiment = emotion_future.result() is_sarcastic, sarcasm_score = sarcasm_future.result() display_analysis_results(manual_text, emotions_dict, top_emotion, emotion_map, sentiment, is_sarcastic, sarcasm_score) show_model_info() st.sidebar.markdown("---") st.sidebar.caption("Voice Sentiment Analysis v2.1") st.sidebar.caption("Optimized for speed and accuracy") if __name__ == "__main__": main()