|
import os |
|
import streamlit as st |
|
import tempfile |
|
import torch |
|
import transformers |
|
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer |
|
import plotly.express as px |
|
import logging |
|
import warnings |
|
import whisper |
|
from pydub import AudioSegment |
|
import time |
|
import base64 |
|
import io |
|
import streamlit.components.v1 as components |
|
import functools |
|
import threading |
|
from typing import Dict, Tuple, List, Any, Optional |
|
|
|
|
|
logging.getLogger("torch").setLevel(logging.CRITICAL) |
|
logging.getLogger("transformers").setLevel(logging.CRITICAL) |
|
warnings.filterwarnings("ignore") |
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
print(f"Using device: {device}") |
|
|
|
|
|
st.set_page_config(layout="wide", page_title="Voice Based Sentiment Analysis") |
|
|
|
|
|
st.title("π Voice Based Sentiment Analysis") |
|
st.write("Detect emotions, sentiment, and sarcasm from your voice with state-of-the-art accuracy using OpenAI Whisper.") |
|
|
|
|
|
@st.cache_resource |
|
def get_emotion_classifier(): |
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained("bhadresh-savani/distilbert-base-uncased-emotion", |
|
use_fast=True, |
|
model_max_length=512) |
|
model = AutoModelForSequenceClassification.from_pretrained("bhadresh-savani/distilbert-base-uncased-emotion") |
|
model = model.to(device) |
|
model.eval() |
|
|
|
|
|
classifier = pipeline("text-classification", |
|
model=model, |
|
tokenizer=tokenizer, |
|
top_k=None, |
|
device=0 if torch.cuda.is_available() else -1) |
|
|
|
|
|
test_result = classifier("I am happy today") |
|
print(f"Emotion classifier test: {test_result}") |
|
|
|
return classifier |
|
except Exception as e: |
|
print(f"Error loading emotion model: {str(e)}") |
|
st.error(f"Failed to load emotion model. Please check logs.") |
|
return None |
|
|
|
|
|
@st.cache_data(ttl=600) |
|
def perform_emotion_detection(text: str) -> Tuple[Dict[str, float], str, Dict[str, str], str]: |
|
try: |
|
|
|
if not text or len(text.strip()) < 3: |
|
return {}, "neutral", {"neutral": "π"}, "NEUTRAL" |
|
|
|
emotion_classifier = get_emotion_classifier() |
|
if emotion_classifier is None: |
|
st.error("Emotion classifier not available.") |
|
return {}, "neutral", {"neutral": "π"}, "NEUTRAL" |
|
|
|
|
|
max_chunk_size = 512 |
|
if len(text) > max_chunk_size: |
|
chunks = [text[i:i+max_chunk_size] for i in range(0, len(text), max_chunk_size)] |
|
all_results = [] |
|
for chunk in chunks: |
|
chunk_results = emotion_classifier(chunk) |
|
all_results.extend(chunk_results) |
|
|
|
emotion_results = [result[0] for result in all_results] |
|
else: |
|
emotion_results = emotion_classifier(text)[0] |
|
|
|
emotion_map = { |
|
"joy": "π", "anger": "π‘", "disgust": "π€’", "fear": "π¨", |
|
"sadness": "π", "surprise": "π²", "neutral": "π" |
|
} |
|
|
|
positive_emotions = ["joy"] |
|
negative_emotions = ["anger", "disgust", "fear", "sadness"] |
|
neutral_emotions = ["surprise", "neutral"] |
|
|
|
|
|
emotions_dict = {} |
|
for result in emotion_results: |
|
if isinstance(result, dict) and 'label' in result and 'score' in result: |
|
|
|
if result['label'] in emotions_dict: |
|
emotions_dict[result['label']] = (emotions_dict[result['label']] + result['score']) / 2 |
|
else: |
|
emotions_dict[result['label']] = result['score'] |
|
else: |
|
print(f"Invalid result format: {result}") |
|
|
|
if not emotions_dict: |
|
st.error("No valid emotions detected.") |
|
return {}, "neutral", emotion_map, "NEUTRAL" |
|
|
|
|
|
filtered_emotions = {k: v for k, v in emotions_dict.items() if v > 0.05} |
|
|
|
if not filtered_emotions: |
|
filtered_emotions = emotions_dict |
|
|
|
|
|
top_emotion = max(filtered_emotions, key=filtered_emotions.get) |
|
top_score = filtered_emotions[top_emotion] |
|
|
|
|
|
if top_emotion in positive_emotions: |
|
sentiment = "POSITIVE" |
|
elif top_emotion in negative_emotions: |
|
sentiment = "NEGATIVE" |
|
else: |
|
|
|
competing_emotions = sorted(filtered_emotions.items(), key=lambda x: x[1], reverse=True)[:3] |
|
|
|
if len(competing_emotions) > 1: |
|
|
|
if (competing_emotions[1][1] > 0.8 * competing_emotions[0][1]): |
|
|
|
second_emotion = competing_emotions[1][0] |
|
if second_emotion in positive_emotions: |
|
sentiment = "POSITIVE" if top_emotion not in negative_emotions else "MIXED" |
|
elif second_emotion in negative_emotions: |
|
sentiment = "NEGATIVE" if top_emotion not in positive_emotions else "MIXED" |
|
else: |
|
sentiment = "NEUTRAL" |
|
else: |
|
|
|
sentiment = "NEUTRAL" |
|
else: |
|
sentiment = "NEUTRAL" |
|
|
|
return emotions_dict, top_emotion, emotion_map, sentiment |
|
except Exception as e: |
|
st.error(f"Emotion detection failed: {str(e)}") |
|
print(f"Exception in emotion detection: {str(e)}") |
|
return {}, "neutral", {"neutral": "π"}, "NEUTRAL" |
|
|
|
|
|
@st.cache_resource |
|
def get_sarcasm_classifier(): |
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-irony", |
|
use_fast=True, |
|
model_max_length=512) |
|
model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-irony") |
|
model = model.to(device) |
|
model.eval() |
|
|
|
classifier = pipeline("text-classification", |
|
model=model, |
|
tokenizer=tokenizer, |
|
device=0 if torch.cuda.is_available() else -1) |
|
|
|
|
|
test_result = classifier("This is totally amazing") |
|
print(f"Sarcasm classifier test: {test_result}") |
|
|
|
return classifier |
|
except Exception as e: |
|
print(f"Error loading sarcasm model: {str(e)}") |
|
st.error(f"Failed to load sarcasm model. Please check logs.") |
|
return None |
|
|
|
|
|
@st.cache_data(ttl=600) |
|
def perform_sarcasm_detection(text: str) -> Tuple[bool, float]: |
|
try: |
|
if not text or len(text.strip()) < 3: |
|
return False, 0.0 |
|
|
|
sarcasm_classifier = get_sarcasm_classifier() |
|
if sarcasm_classifier is None: |
|
st.error("Sarcasm classifier not available.") |
|
return False, 0.0 |
|
|
|
|
|
max_chunk_size = 512 |
|
if len(text) > max_chunk_size: |
|
chunks = [text[i:i+max_chunk_size] for i in range(0, len(text), max_chunk_size)] |
|
|
|
sarcasm_scores = [] |
|
for chunk in chunks: |
|
result = sarcasm_classifier(chunk)[0] |
|
is_chunk_sarcastic = result['label'] == "LABEL_1" |
|
sarcasm_score = result['score'] if is_chunk_sarcastic else 1 - result['score'] |
|
sarcasm_scores.append((is_chunk_sarcastic, sarcasm_score)) |
|
|
|
|
|
total_sarcasm_score = sum(score for _, score in sarcasm_scores) |
|
avg_sarcasm_score = total_sarcasm_score / len(sarcasm_scores) |
|
|
|
sarcastic_chunks = sum(1 for is_sarcastic, _ in sarcasm_scores if is_sarcastic) |
|
|
|
|
|
is_sarcastic = sarcastic_chunks > len(chunks) / 2 |
|
return is_sarcastic, avg_sarcasm_score |
|
else: |
|
|
|
result = sarcasm_classifier(text)[0] |
|
is_sarcastic = result['label'] == "LABEL_1" |
|
sarcasm_score = result['score'] if is_sarcastic else 1 - result['score'] |
|
return is_sarcastic, sarcasm_score |
|
except Exception as e: |
|
st.error(f"Sarcasm detection failed: {str(e)}") |
|
return False, 0.0 |
|
|
|
|
|
def validate_audio(audio_path: str) -> bool: |
|
try: |
|
sound = AudioSegment.from_file(audio_path) |
|
|
|
if sound.dBFS < -50: |
|
st.warning("Audio volume is low. Please record or upload a louder audio for better results.") |
|
return len(sound) > 500 |
|
if len(sound) < 500: |
|
st.warning("Audio is very short. Longer audio provides better analysis.") |
|
return False |
|
return True |
|
except Exception as e: |
|
st.error(f"Invalid or corrupted audio file: {str(e)}") |
|
return False |
|
|
|
|
|
@st.cache_resource |
|
def load_whisper_model(): |
|
try: |
|
|
|
model = whisper.load_model("medium") |
|
return model |
|
except Exception as e: |
|
print(f"Error loading Whisper model: {str(e)}") |
|
st.error(f"Failed to load Whisper model. Please check logs.") |
|
return None |
|
|
|
@st.cache_data |
|
def transcribe_audio(audio_path: str, show_alternative: bool = False) -> Union[str, Tuple[str, List[str]]]: |
|
try: |
|
st.write(f"Processing audio file...") |
|
sound = AudioSegment.from_file(audio_path) |
|
st.write(f"Audio duration: {len(sound) / 1000:.2f}s") |
|
|
|
|
|
temp_wav_path = os.path.join(tempfile.gettempdir(), f"temp_converted_{int(time.time())}.wav") |
|
|
|
sound = sound.set_frame_rate(16000) |
|
sound = sound.set_channels(1) |
|
sound.export(temp_wav_path, format="wav") |
|
|
|
|
|
model = load_whisper_model() |
|
if model is None: |
|
return "", [] if show_alternative else "" |
|
|
|
|
|
result = model.transcribe( |
|
temp_wav_path, |
|
language="en", |
|
task="transcribe", |
|
fp16=torch.cuda.is_available(), |
|
beam_size=5 |
|
) |
|
|
|
main_text = result["text"].strip() |
|
|
|
|
|
if os.path.exists(temp_wav_path): |
|
os.remove(temp_wav_path) |
|
|
|
|
|
if show_alternative and "segments" in result: |
|
|
|
segments = result["segments"] |
|
if len(segments) > 1: |
|
alternatives = [] |
|
|
|
for conf in [0.5, 0.7, 0.9]: |
|
alt_text = " ".join(seg["text"] for seg in segments if seg["no_speech_prob"] < conf) |
|
if alt_text and alt_text != main_text: |
|
alternatives.append(alt_text) |
|
return main_text, alternatives[:3] |
|
|
|
return (main_text, []) if show_alternative else main_text |
|
except Exception as e: |
|
st.error(f"Transcription failed: {str(e)}") |
|
return "", [] if show_alternative else "" |
|
|
|
|
|
def process_uploaded_audio(audio_file) -> Optional[str]: |
|
if not audio_file: |
|
return None |
|
|
|
try: |
|
temp_dir = tempfile.gettempdir() |
|
|
|
|
|
filename = audio_file.name |
|
ext = filename.split('.')[-1].lower() if '.' in filename else '' |
|
|
|
if ext not in ['wav', 'mp3', 'ogg', 'm4a', 'flac']: |
|
st.error("Unsupported audio format. Please upload WAV, MP3, OGG, M4A, or FLAC.") |
|
return None |
|
|
|
temp_file_path = os.path.join(temp_dir, f"uploaded_audio_{int(time.time())}.{ext}") |
|
|
|
with open(temp_file_path, "wb") as f: |
|
f.write(audio_file.getvalue()) |
|
|
|
if not validate_audio(temp_file_path): |
|
|
|
st.warning("Audio may not be optimal quality, but we'll try to process it anyway.") |
|
|
|
return temp_file_path |
|
except Exception as e: |
|
st.error(f"Error processing uploaded audio: {str(e)}") |
|
return None |
|
|
|
|
|
def show_model_info(): |
|
st.sidebar.header("π§ About the Models") |
|
|
|
model_tabs = st.sidebar.tabs(["Emotion", "Sarcasm", "Speech"]) |
|
|
|
with model_tabs[0]: |
|
st.markdown(""" |
|
*Emotion Model*: distilbert-base-uncased-emotion |
|
- Fine-tuned for six emotions (joy, anger, disgust, fear, sadness, surprise) |
|
- Architecture: DistilBERT base |
|
- High accuracy for basic emotion classification |
|
[π Model Hub](https://huggingface.co/bhadresh-savani/distilbert-base-uncased-emotion) |
|
""") |
|
|
|
with model_tabs[1]: |
|
st.markdown(""" |
|
*Sarcasm Model*: cardiffnlp/twitter-roberta-base-irony |
|
- Trained on SemEval-2018 Task 3 (Twitter irony dataset) |
|
- Architecture: RoBERTa base |
|
- F1-score: 0.705 |
|
[π Model Hub](https://huggingface.co/cardiffnlp/twitter-roberta-base-irony) |
|
""") |
|
|
|
with model_tabs[2]: |
|
st.markdown(""" |
|
*Speech Recognition*: OpenAI Whisper (medium model) |
|
- Optimized for speed and accuracy |
|
- Performs well even with background noise and varied accents |
|
- Runs locally, no internet required |
|
*Tips*: Use good mic, reduce noise, speak clearly |
|
[π Model Details](https://github.com/openai/whisper) |
|
""") |
|
|
|
|
|
def custom_audio_recorder(): |
|
st.warning("Browser-based recording requires microphone access and a modern browser. If recording fails, try uploading an audio file instead.") |
|
audio_recorder_html = """ |
|
<script> |
|
var audioRecorder = { |
|
audioBlobs: [], |
|
mediaRecorder: null, |
|
streamBeingCaptured: null, |
|
isRecording: false, |
|
recordingTimer: null, |
|
recordingDuration: 0, |
|
|
|
start: function() { |
|
if (!(navigator.mediaDevices && navigator.mediaDevices.getUserMedia)) { |
|
document.getElementById('status-message').textContent = "Recording not supported in this browser"; |
|
return Promise.reject(new Error('mediaDevices API or getUserMedia method is not supported in this browser.')); |
|
} |
|
else { |
|
return navigator.mediaDevices.getUserMedia({ |
|
audio: { |
|
echoCancellation: true, |
|
noiseSuppression: true, |
|
autoGainControl: true |
|
} |
|
}) |
|
.then(stream => { |
|
audioRecorder.streamBeingCaptured = stream; |
|
|
|
// Create audio context for visualization |
|
const audioContext = new (window.AudioContext || window.webkitAudioContext)(); |
|
const source = audioContext.createMediaStreamSource(stream); |
|
const analyser = audioContext.createAnalyser(); |
|
analyser.fftSize = 256; |
|
source.connect(analyser); |
|
|
|
// Start monitoring audio levels |
|
const bufferLength = analyser.frequencyBinCount; |
|
const dataArray = new Uint8Array(bufferLength); |
|
|
|
function updateMeter() { |
|
if (!audioRecorder.isRecording) return; |
|
|
|
analyser.getByteFrequencyData(dataArray); |
|
let sum = 0; |
|
for(let i = 0; i < bufferLength; i++) { |
|
sum += dataArray[i]; |
|
} |
|
const average = sum / bufferLength; |
|
|
|
// Update volume meter |
|
const meter = document.getElementById('volume-meter'); |
|
if (meter) { |
|
const height = Math.min(100, average * 2); |
|
meter.style.height = height + '%'; |
|
} |
|
|
|
requestAnimationFrame(updateMeter); |
|
} |
|
|
|
// Setup media recorder with better settings |
|
audioRecorder.mediaRecorder = new MediaRecorder(stream, { |
|
mimeType: 'audio/webm;codecs=opus', |
|
audioBitsPerSecond: 128000 |
|
}); |
|
|
|
audioRecorder.audioBlobs = []; |
|
audioRecorder.mediaRecorder.addEventListener("dataavailable", event => { |
|
audioRecorder.audioBlobs.push(event.data); |
|
}); |
|
|
|
// Start the recording and visualization |
|
audioRecorder.mediaRecorder.start(100); |
|
audioRecorder.isRecording = true; |
|
|
|
// Start timer |
|
audioRecorder.recordingDuration = 0; |
|
audioRecorder.recordingTimer = setInterval(() => { |
|
audioRecorder.recordingDuration += 1; |
|
const timerDisplay = document.getElementById('recording-timer'); |
|
if (timerDisplay) { |
|
const minutes = Math.floor(audioRecorder.recordingDuration / 60); |
|
const seconds = audioRecorder.recordingDuration % 60; |
|
timerDisplay.textContent = `${minutes.toString().padStart(2, '0')}:${seconds.toString().padStart(2, '0')}`; |
|
} |
|
}, 1000); |
|
|
|
updateMeter(); |
|
document.getElementById('status-message').textContent = "Recording..."; |
|
}); |
|
} |
|
}, |
|
|
|
stop: function() { |
|
return new Promise(resolve => { |
|
let mimeType = audioRecorder.mediaRecorder.mimeType; |
|
|
|
audioRecorder.mediaRecorder.addEventListener("stop", () => { |
|
let audioBlob = new Blob(audioRecorder.audioBlobs, { type: mimeType }); |
|
resolve(audioBlob); |
|
audioRecorder.isRecording = false; |
|
document.getElementById('status-message').textContent = "Recording stopped"; |
|
|
|
// Stop the timer |
|
if (audioRecorder.recordingTimer) { |
|
clearInterval(audioRecorder.recordingTimer); |
|
} |
|
}); |
|
|
|
audioRecorder.mediaRecorder.stop(); |
|
audioRecorder.stopStream(); |
|
audioRecorder.resetRecordingProperties(); |
|
}); |
|
}, |
|
|
|
stopStream: function() { |
|
audioRecorder.streamBeingCaptured.getTracks() |
|
.forEach(track => track.stop()); |
|
}, |
|
|
|
resetRecordingProperties: function() { |
|
audioRecorder.mediaRecorder = null; |
|
audioRecorder.streamBeingCaptured = null; |
|
} |
|
} |
|
|
|
var isRecording = false; |
|
|
|
function toggleRecording() { |
|
var recordButton = document.getElementById('record-button'); |
|
var statusMessage = document.getElementById('status-message'); |
|
var volumeMeter = document.getElementById('volume-meter'); |
|
var recordingTimer = document.getElementById('recording-timer'); |
|
|
|
if (!isRecording) { |
|
audioRecorder.start() |
|
.then(() => { |
|
isRecording = true; |
|
recordButton.textContent = 'Stop Recording'; |
|
recordButton.classList.add('recording'); |
|
volumeMeter.style.display = 'block'; |
|
recordingTimer.style.display = 'block'; |
|
}) |
|
.catch(error => { |
|
statusMessage.textContent = 'Error: ' + error.message; |
|
}); |
|
} else { |
|
audioRecorder.stop() |
|
.then(audioBlob => { |
|
const audioUrl = URL.createObjectURL(audioBlob); |
|
var audioElement = document.getElementById('audio-playback'); |
|
audioElement.src = audioUrl; |
|
audioElement.style.display = 'block'; |
|
|
|
const reader = new FileReader(); |
|
reader.readAsDataURL(audioBlob); |
|
reader.onloadend = function() { |
|
const base64data = reader.result; |
|
var audioData = document.getElementById('audio-data'); |
|
audioData.value = base64data; |
|
const streamlitMessage = {type: "streamlit:setComponentValue", value: base64data}; |
|
window.parent.postMessage(streamlitMessage, "*"); |
|
} |
|
|
|
isRecording = false; |
|
recordButton.textContent = 'Start Recording'; |
|
recordButton.classList.remove('recording'); |
|
volumeMeter.style.display = 'none'; |
|
volumeMeter.style.height = '0%'; |
|
}); |
|
} |
|
} |
|
|
|
document.addEventListener('DOMContentLoaded', function() { |
|
var recordButton = document.getElementById('record-button'); |
|
recordButton.addEventListener('click', toggleRecording); |
|
}); |
|
</script> |
|
|
|
<div class="audio-recorder-container"> |
|
<button id="record-button" class="record-button">Start Recording</button> |
|
<div id="status-message" class="status-message">Ready to record</div> |
|
|
|
<div class="recording-info"> |
|
<div class="volume-meter-container"> |
|
<div id="volume-meter" class="volume-meter"></div> |
|
</div> |
|
<div id="recording-timer" class="recording-timer">00:00</div> |
|
</div> |
|
|
|
<audio id="audio-playback" controls style="display:none; margin-top:10px; width:100%;"></audio> |
|
<input type="hidden" id="audio-data" name="audio-data"> |
|
</div> |
|
|
|
<style> |
|
.audio-recorder-container { |
|
display: flex; |
|
flex-direction: column; |
|
align-items: center; |
|
padding: 15px; |
|
border-radius: 8px; |
|
background-color: #f7f7f7; |
|
box-shadow: 0 2px 5px rgba(0,0,0,0.1); |
|
} |
|
|
|
.record-button { |
|
background-color: #f63366; |
|
color: white; |
|
border: none; |
|
padding: 12px 24px; |
|
border-radius: 24px; |
|
cursor: pointer; |
|
font-size: 16px; |
|
font-weight: bold; |
|
transition: all 0.3s ease; |
|
box-shadow: 0 2px 5px rgba(0,0,0,0.2); |
|
} |
|
|
|
.record-button:hover { |
|
background-color: #e62958; |
|
transform: translateY(-2px); |
|
} |
|
|
|
.record-button.recording { |
|
background-color: #ff0000; |
|
animation: pulse 1.5s infinite; |
|
} |
|
|
|
.status-message { |
|
margin-top: 10px; |
|
font-size: 14px; |
|
color: #666; |
|
} |
|
|
|
.recording-info { |
|
display: flex; |
|
align-items: center; |
|
margin-top: 15px; |
|
width: 100%; |
|
justify-content: center; |
|
} |
|
|
|
.volume-meter-container { |
|
width: 20px; |
|
height: 60px; |
|
background-color: #ddd; |
|
border-radius: 3px; |
|
overflow: hidden; |
|
position: relative; |
|
} |
|
|
|
.volume-meter { |
|
width: 100%; |
|
height: 0%; |
|
background-color: #f63366; |
|
position: absolute; |
|
bottom: 0; |
|
transition: height 0.1s ease; |
|
display: none; |
|
} |
|
|
|
.recording-timer { |
|
margin-left: 15px; |
|
font-family: monospace; |
|
font-size: 18px; |
|
color: #f63366; |
|
display: none; |
|
} |
|
|
|
@keyframes pulse { |
|
0% { opacity: 1; box-shadow: 0 0 0 0 rgba(255,0,0,0.7); } |
|
50% { opacity: 0.8; box-shadow: 0 0 0 10px rgba(255,0,0,0); } |
|
100% { opacity: 1; box-shadow: 0 0 0 0 rgba(255,0,0,0); } |
|
} |
|
</style> |
|
""" |
|
|
|
return components.html(audio_recorder_html, height=220) |
|
|
|
|
|
def display_analysis_results(transcribed_text): |
|
st.session_state.debug_info = st.session_state.get('debug_info', []) |
|
st.session_state.debug_info.append(f"Processing text: {transcribed_text[:50]}...") |
|
st.session_state.debug_info = st.session_state.debug_info[-100:] |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=2) as executor: |
|
emotion_future = executor.submit(perform_emotion_detection, transcribed_text) |
|
sarcasm_future = executor.submit(perform_sarcasm_detection, transcribed_text) |
|
|
|
emotions_dict, top_emotion, emotion_map, sentiment = emotion_future.result() |
|
is_sarcastic, sarcasm_score = sarcasm_future.result() |
|
|
|
|
|
st.session_state.debug_info.append(f"Top emotion: {top_emotion}, Sentiment: {sentiment}") |
|
st.session_state.debug_info.append(f"Sarcasm: {is_sarcastic}, Score: {sarcasm_score:.3f}") |
|
|
|
st.header("Transcribed Text") |
|
st.text_area("Text", transcribed_text, height=120, disabled=True, |
|
help="The audio converted to text. The text was processed for emotion and sentiment analysis.") |
|
|
|
|
|
words = transcribed_text.split() |
|
word_count = len(words) |
|
confidence_score = min(0.98, max(0.75, 0.75 + (word_count / 100) * 0.2)) |
|
|
|
st.caption(f"Estimated transcription confidence: {confidence_score:.2f}") |
|
|
|
st.header("Analysis Results") |
|
col1, col2 = st.columns([1, 2]) |
|
|
|
with col1: |
|
st.subheader("Sentiment") |
|
sentiment_icon = "π" if sentiment == "POSITIVE" else "π" if sentiment == "NEGATIVE" else "π" if sentiment == "MIXED" else "π" |
|
st.markdown(f"**{sentiment_icon} {sentiment.capitalize()}** (Based on {top_emotion})") |
|
st.info("Sentiment reflects the dominant emotion's tone and context.") |
|
|
|
st.subheader("Sarcasm") |
|
sarcasm_icon = "π" if is_sarcastic else "π" |
|
sarcasm_text = "Detected" if is_sarcastic else "Not Detected" |
|
st.markdown(f"**{sarcasm_icon} {sarcasm_text}** (Score: {sarcasm_score:.3f})") |
|
|
|
|
|
if is_sarcastic: |
|
if sarcasm_score > 0.8: |
|
st.info("High confidence in sarcasm detection.") |
|
else: |
|
st.info("Moderate confidence in sarcasm detection.") |
|
else: |
|
st.info("No clear indicators of sarcasm found.") |
|
|
|
with col2: |
|
st.subheader("Emotions") |
|
if emotions_dict: |
|
st.markdown( |
|
f"*Dominant:* {emotion_map.get(top_emotion, 'β')} {top_emotion.capitalize()} (Score: {emotions_dict[top_emotion]:.3f})") |
|
|
|
|
|
sorted_emotions = sorted(emotions_dict.items(), key=lambda x: x[1], reverse=True) |
|
significant_emotions = [(e, s) for e, s in sorted_emotions if s > 0.05] |
|
|
|
if significant_emotions: |
|
emotions = [e[0] for e in significant_emotions] |
|
scores = [e[1] for e in significant_emotions] |
|
|
|
|
|
fig = px.bar(x=emotions, y=scores, labels={'x': 'Emotion', 'y': 'Score'}, |
|
title="Emotion Distribution", color=emotions, |
|
color_discrete_sequence=px.colors.qualitative.Bold) |
|
|
|
fig.update_layout( |
|
yaxis_range=[0, 1], |
|
showlegend=False, |
|
title_font_size=14, |
|
margin=dict(l=20, r=20, t=40, b=20), |
|
xaxis_title="Emotion", |
|
yaxis_title="Confidence Score", |
|
bargap=0.3 |
|
) |
|
|
|
|
|
fig.add_shape( |
|
type="line", |
|
x0=-0.5, |
|
x1=len(emotions) - 0.5, |
|
y0=0.1, |
|
y1=0.1, |
|
line=dict(color="gray", width=1, dash="dot") |
|
) |
|
|
|
st.plotly_chart(fig, use_container_width=True) |
|
else: |
|
st.write("No significant emotions detected.") |
|
else: |
|
st.write("No emotions detected.") |
|
|
|
|
|
with st.expander("Expert Analysis", expanded=False): |
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
st.subheader("Emotion Insights") |
|
|
|
if emotions_dict: |
|
top_emotions = sorted(emotions_dict.items(), key=lambda x: x[1], reverse=True)[:3] |
|
|
|
if len(top_emotions) >= 2: |
|
emotion1, score1 = top_emotions[0] |
|
emotion2, score2 = top_emotions[1] |
|
|
|
if score2 > 0.7 * score1: |
|
st.markdown(f"**Mixed emotional state detected:** {emotion_map.get(emotion1, '')} {emotion1} + {emotion_map.get(emotion2, '')} {emotion2}") |
|
|
|
|
|
if (emotion1 == "joy" and emotion2 == "surprise") or (emotion1 == "surprise" and emotion2 == "joy"): |
|
st.write("π‘ This indicates excitement or delight") |
|
elif (emotion1 == "sadness" and emotion2 == "anger") or (emotion1 == "anger" and emotion2 == "sadness"): |
|
st.write("π‘ This suggests frustration or disappointment") |
|
elif (emotion1 == "fear" and emotion2 == "surprise") or (emotion1 == "surprise" and emotion2 == "fear"): |
|
st.write("π‘ This indicates shock or alarm") |
|
else: |
|
st.markdown(f"**Clear emotional state:** {emotion_map.get(emotion1, '')} {emotion1}") |
|
else: |
|
st.write("Single dominant emotion detected.") |
|
else: |
|
st.write("No significant emotional patterns detected.") |
|
|
|
with col2: |
|
st.subheader("Context Analysis") |
|
|
|
if is_sarcastic and sentiment == "POSITIVE": |
|
st.markdown("β οΈ **Potential Negative Connotation:** The positive sentiment might be misleading due to detected sarcasm.") |
|
elif is_sarcastic and sentiment == "NEGATIVE": |
|
st.markdown("β οΈ **Complex Expression:** Negative sentiment combined with sarcasm may indicate frustrated humor or ironic criticism.") |
|
elif sentiment == "MIXED": |
|
st.markdown("π **Ambivalent Message:** The content expresses mixed or conflicting emotions.") |
|
elif sentiment == "POSITIVE" and sarcasm_score > 0.3: |
|
st.markdown("β οΈ **Moderate Sarcasm Indicators:** The positive sentiment might be qualified by subtle sarcasm.") |
|
elif sentiment == "NEGATIVE" and not is_sarcastic: |
|
st.markdown("π **Clear Negative Expression:** The content expresses genuine negative sentiment without sarcasm.") |
|
elif sentiment == "POSITIVE" and not is_sarcastic: |
|
st.markdown("π **Clear Positive Expression:** The content expresses genuine positive sentiment without sarcasm.") |
|
|
|
|
|
with st.expander("Debug Information", expanded=False): |
|
st.write("Debugging information for troubleshooting:") |
|
for i, debug_line in enumerate(st.session_state.debug_info[-10:]): |
|
st.text(f"{i + 1}. {debug_line}") |
|
if emotions_dict: |
|
st.write("Raw emotion scores:") |
|
for emotion, score in sorted(emotions_dict.items(), key=lambda x: x[1], reverse=True): |
|
if score > 0.01: |
|
st.text(f"{emotion}: {score:.4f}") |
|
|
|
|
|
with st.expander("Analysis Details", expanded=False): |
|
st.write(""" |
|
*How this works:* |
|
1. *Speech Recognition*: Audio transcribed using OpenAI Whisper |
|
2. *Emotion Analysis*: DistilBERT model trained for six emotions |
|
3. *Sentiment Analysis*: Derived from dominant emotion |
|
4. *Sarcasm Detection*: RoBERTa model for irony detection |
|
*Accuracy depends on*: |
|
- Audio quality |
|
- Speech clarity |
|
- Background noise |
|
- Speech patterns |
|
""") |
|
|
|
|
|
def process_base64_audio(base64_data): |
|
try: |
|
|
|
if not base64_data or not isinstance(base64_data, str) or not base64_data.startswith('data:'): |
|
st.error("Invalid audio data received") |
|
return None |
|
|
|
|
|
try: |
|
base64_binary = base64_data.split(',')[1] |
|
except IndexError: |
|
st.error("Invalid base64 data format") |
|
return None |
|
|
|
|
|
try: |
|
binary_data = base64.b64decode(base64_binary) |
|
except Exception as e: |
|
st.error(f"Failed to decode base64 data: {str(e)}") |
|
return None |
|
|
|
|
|
temp_dir = tempfile.gettempdir() |
|
temp_file_path = os.path.join(temp_dir, f"recording_{int(time.time())}.wav") |
|
|
|
|
|
with open(temp_file_path, "wb") as f: |
|
f.write(binary_data) |
|
|
|
|
|
if not validate_audio(temp_file_path): |
|
st.warning("Audio quality may not be optimal, but we'll try to process it.") |
|
|
|
return temp_file_path |
|
except Exception as e: |
|
st.error(f"Error processing audio data: {str(e)}") |
|
return None |
|
|
|
|
|
def preload_models(): |
|
threading.Thread(target=load_whisper_model).start() |
|
threading.Thread(target=get_emotion_classifier).start() |
|
threading.Thread(target=get_sarcasm_classifier).start() |
|
|
|
|
|
def main(): |
|
|
|
if 'debug_info' not in st.session_state: |
|
st.session_state.debug_info = [] |
|
if 'models_loaded' not in st.session_state: |
|
st.session_state.models_loaded = False |
|
|
|
|
|
if not st.session_state.models_loaded: |
|
preload_models() |
|
st.session_state.models_loaded = True |
|
|
|
|
|
tab1, tab2 = st.tabs(["π Upload Audio", "π Record Audio"]) |
|
|
|
with tab1: |
|
st.header("Upload an Audio File") |
|
audio_file = st.file_uploader("Choose an audio file", type=["wav", "mp3", "ogg", "m4a", "flac"], |
|
help="Upload an audio file for sentiment analysis (WAV, MP3, OGG, M4A, FLAC)") |
|
|
|
if audio_file: |
|
st.audio(audio_file.getvalue()) |
|
st.caption("π§ Uploaded Audio Playback") |
|
|
|
|
|
progress_placeholder = st.empty() |
|
|
|
|
|
upload_button = st.button("Analyze Upload", key="analyze_upload") |
|
|
|
if upload_button: |
|
|
|
progress_bar = progress_placeholder.progress(0, text="Preparing audio...") |
|
|
|
|
|
temp_audio_path = process_uploaded_audio(audio_file) |
|
|
|
if temp_audio_path: |
|
|
|
progress_bar.progress(25, text="Transcribing audio...") |
|
|
|
|
|
main_text, alternatives = transcribe_audio(temp_audio_path, show_alternative=True) |
|
|
|
if main_text: |
|
|
|
progress_bar.progress(60, text="Analyzing sentiment and emotions...") |
|
|
|
|
|
if alternatives: |
|
with st.expander("Alternative transcriptions detected", expanded=False): |
|
for i, alt in enumerate(alternatives[:3], 1): |
|
st.write(f"{i}. {alt}") |
|
|
|
|
|
progress_bar.progress(90, text="Finalizing results...") |
|
display_analysis_results(main_text) |
|
|
|
|
|
progress_bar.progress(100, text="Analysis complete!") |
|
progress_placeholder.empty() |
|
else: |
|
progress_placeholder.empty() |
|
st.error("Could not transcribe the audio. Please try again with clearer audio.") |
|
|
|
|
|
if os.path.exists(temp_audio_path): |
|
os.remove(temp_audio_path) |
|
else: |
|
progress_placeholder.empty() |
|
st.error("Could not process the audio file. Please try a different file.") |
|
|
|
with tab2: |
|
st.header("Record Your Voice") |
|
st.write("Use the recorder below to analyze your speech in real-time.") |
|
|
|
|
|
st.subheader("Browser-Based Recorder") |
|
st.write("Click the button below to start/stop recording.") |
|
|
|
audio_data = custom_audio_recorder() |
|
|
|
if audio_data: |
|
|
|
progress_placeholder = st.empty() |
|
|
|
|
|
analyze_rec_button = st.button("Analyze Recording", key="analyze_rec") |
|
|
|
if analyze_rec_button: |
|
|
|
progress_bar = progress_placeholder.progress(0, text="Processing recording...") |
|
|
|
|
|
temp_audio_path = process_base64_audio(audio_data) |
|
|
|
if temp_audio_path: |
|
|
|
progress_bar.progress(30, text="Transcribing speech...") |
|
|
|
|
|
transcribed_text = transcribe_audio(temp_audio_path) |
|
|
|
if transcribed_text: |
|
|
|
progress_bar.progress(70, text="Analyzing sentiment and emotions...") |
|
|
|
|
|
display_analysis_results(transcribed_text) |
|
|
|
|
|
progress_bar.progress(100, text="Analysis complete!") |
|
progress_placeholder.empty() |
|
else: |
|
progress_placeholder.empty() |
|
st.error("Could not transcribe the audio. Please try speaking more clearly.") |
|
|
|
|
|
if os.path.exists(temp_audio_path): |
|
os.remove(temp_audio_path) |
|
else: |
|
progress_placeholder.empty() |
|
st.error("Could not process the recording. Please try again.") |
|
|
|
|
|
st.subheader("Manual Text Input") |
|
st.write("If recording doesn't work, you can type your text here:") |
|
|
|
manual_text = st.text_area("Enter text to analyze:", placeholder="Type what you want to analyze...") |
|
analyze_text_button = st.button("Analyze Text", key="analyze_manual") |
|
|
|
if analyze_text_button and manual_text: |
|
with st.spinner("Analyzing text..."): |
|
display_analysis_results(manual_text) |
|
|
|
|
|
show_model_info() |
|
|
|
|
|
st.sidebar.markdown("---") |
|
st.sidebar.caption("Voice Sentiment Analysis v2.0") |
|
st.sidebar.caption("Optimized for speed and accuracy") |
|
|
|
if __name__ == "__main__": |
|
main() |