File size: 11,296 Bytes
3cf77dc 6d401a4 58b0884 6d401a4 3cf77dc 6d401a4 854f1c9 58b0884 3cf77dc 854f1c9 3cf77dc 6d401a4 854f1c9 3cf77dc 854f1c9 3cf77dc 854f1c9 1949646 854f1c9 1949646 854f1c9 42d828e 854f1c9 3cf77dc 854f1c9 42d828e 854f1c9 58b0884 3cf77dc 58b0884 3cf77dc 854f1c9 3cf77dc 854f1c9 58b0884 3cf77dc 854f1c9 6d401a4 854f1c9 6d401a4 1949646 6d401a4 854f1c9 42d828e 854f1c9 3a51c3e 854f1c9 42d828e 6d401a4 42d828e 854f1c9 6d401a4 854f1c9 58b0884 854f1c9 58b0884 3a51c3e 854f1c9 3a51c3e 854f1c9 58b0884 854f1c9 6d401a4 854f1c9 42d828e 854f1c9 6d401a4 42d828e 58b0884 854f1c9 58b0884 854f1c9 42d828e 854f1c9 42d828e 854f1c9 42d828e 854f1c9 42d828e 854f1c9 58b0884 42d828e 854f1c9 58b0884 854f1c9 58b0884 854f1c9 3cf77dc 854f1c9 58b0884 854f1c9 42d828e 854f1c9 6d401a4 854f1c9 6d401a4 854f1c9 58b0884 854f1c9 6d401a4 854f1c9 3cf77dc 854f1c9 58b0884 854f1c9 58b0884 854f1c9 42d828e 854f1c9 6d401a4 3448878 854f1c9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
import os
import streamlit as st
import tempfile
import torch
import transformers
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
import plotly.express as px
import logging
import warnings
import whisper
from pydub import AudioSegment
import time
import base64
import io
import streamlit.components.v1 as components
# Suppress warnings for a clean console
logging.getLogger("torch").setLevel(logging.CRITICAL)
logging.getLogger("transformers").setLevel(logging.CRITICAL)
warnings.filterwarnings("ignore")
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# Check if CUDA is available, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Set Streamlit app layout
st.set_page_config(layout="wide", page_title="Voice Based Sentiment Analysis")
# Interface design
st.title("π Voice Based Sentiment Analysis")
st.write("Detect emotions, sentiment, and sarcasm from your voice with optimized speed and accuracy using OpenAI Whisper.")
# Emotion Detection Function
@st.cache_resource
def get_emotion_classifier():
try:
tokenizer = AutoTokenizer.from_pretrained("bhadresh-savani/distilbert-base-uncased-emotion", use_fast=True)
model = AutoModelForSequenceClassification.from_pretrained("bhadresh-savani/distilbert-base-uncased-emotion").to(device)
if torch.cuda.is_available():
model = model.half() # Use fp16 on GPU
classifier = pipeline("text-classification",
model=model,
tokenizer=tokenizer,
top_k=None,
device=0 if torch.cuda.is_available() else -1)
return classifier
except Exception as e:
st.error(f"Failed to load emotion model: {str(e)}")
return None
def perform_emotion_detection(text):
try:
if not text or len(text.strip()) < 3:
return {}, "neutral", {}, "NEUTRAL"
emotion_classifier = get_emotion_classifier()
if not emotion_classifier:
return {}, "neutral", {}, "NEUTRAL"
emotion_results = emotion_classifier(text)[0]
emotion_map = {
"joy": "π", "anger": "π‘", "disgust": "π€’", "fear": "π¨",
"sadness": "π", "surprise": "π²"
}
positive_emotions = ["joy"]
negative_emotions = ["anger", "disgust", "fear", "sadness"]
neutral_emotions = ["surprise"]
emotions_dict = {result['label']: result['score'] for result in emotion_results}
filtered_emotions = {k: v for k, v in emotions_dict.items() if v > 0.01}
if not filtered_emotions:
filtered_emotions = emotions_dict
top_emotion = max(filtered_emotions, key=filtered_emotions.get)
if top_emotion in positive_emotions:
sentiment = "POSITIVE"
elif top_emotion in negative_emotions:
sentiment = "NEGATIVE"
else:
sentiment = "NEUTRAL"
return emotions_dict, top_emotion, emotion_map, sentiment
except Exception as e:
st.error(f"Emotion detection failed: {str(e)}")
return {}, "neutral", {}, "NEUTRAL"
# Sarcasm Detection Function
@st.cache_resource
def get_sarcasm_classifier():
try:
tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-irony", use_fast=True)
model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-irony").to(device)
if torch.cuda.is_available():
model = model.half() # Use fp16 on GPU
classifier = pipeline("text-classification", model=model, tokenizer=tokenizer,
device=0 if torch.cuda.is_available() else -1)
return classifier
except Exception as e:
st.error(f"Failed to load sarcasm model: {str(e)}")
return None
def perform_sarcasm_detection(text):
try:
if not text or len(text.strip()) < 3:
return False, 0.0
sarcasm_classifier = get_sarcasm_classifier()
if not sarcasm_classifier:
return False, 0.0
result = sarcasm_classifier(text)[0]
is_sarcastic = result['label'] == "LABEL_1"
sarcasm_score = result['score'] if is_sarcastic else 1 - result['score']
return is_sarcastic, sarcasm_score
except Exception as e:
st.error(f"Sarcasm detection failed: {str(e)}")
return False, 0.0
# Validate audio quality
def validate_audio(audio_path):
try:
sound = AudioSegment.from_file(audio_path)
if sound.dBFS < -55:
st.warning("Audio volume is too low.")
return False
if len(sound) < 1000:
st.warning("Audio is too short.")
return False
return True
except Exception as e:
st.error(f"Invalid audio file: {str(e)}")
return False
# Speech Recognition with Whisper
@st.cache_resource
def load_whisper_model():
try:
model = whisper.load_model("base").to(device)
return model
except Exception as e:
st.error(f"Failed to load Whisper model: {str(e)}")
return None
def transcribe_audio(audio_path):
temp_wav_path = None
try:
sound = AudioSegment.from_file(audio_path).set_frame_rate(16000).set_channels(1)
temp_wav_path = os.path.join(tempfile.gettempdir(), f"temp_{int(time.time())}.wav")
sound.export(temp_wav_path, format="wav")
model = load_whisper_model()
if not model:
return ""
result = model.transcribe(temp_wav_path, language="en", fp16=torch.cuda.is_available())
return result["text"].strip()
except Exception as e:
st.error(f"Transcription failed: {str(e)}")
return ""
finally:
if temp_wav_path and os.path.exists(temp_wav_path):
os.remove(temp_wav_path)
# Process uploaded audio files
def process_uploaded_audio(audio_file):
if not audio_file:
return None
temp_file_path = None
try:
ext = audio_file.name.split('.')[-1].lower()
if ext not in ['wav', 'mp3', 'ogg']:
st.error("Unsupported audio format. Use WAV, MP3, or OGG.")
return None
temp_file_path = os.path.join(tempfile.gettempdir(), f"uploaded_{int(time.time())}.{ext}")
with open(temp_file_path, "wb") as f:
f.write(audio_file.getvalue())
if not validate_audio(temp_file_path):
return None
return temp_file_path
except Exception as e:
st.error(f"Error processing uploaded audio: {str(e)}")
return None
finally:
if temp_file_path and os.path.exists(temp_file_path) and not st.session_state.get('keep_temp', False):
os.remove(temp_file_path)
# Show model information
def show_model_info():
st.sidebar.header("π§ About the Models")
with st.sidebar.expander("Model Details"):
st.markdown("""
- *Emotion*: DistilBERT (bhadresh-savani/distilbert-base-uncased-emotion)
- *Sarcasm*: RoBERTa (cardiffnlp/twitter-roberta-base-irony)
- *Speech*: OpenAI Whisper (base)
""")
# Custom audio recorder
def custom_audio_recorder():
st.warning("Recording requires microphone access and a modern browser.")
audio_recorder_html = """
<script>
let recorder, stream;
async function startRecording() {
try {
stream = await navigator.mediaDevices.getUserMedia({ audio: true });
recorder = new MediaRecorder(stream);
const chunks = [];
recorder.ondataavailable = e => chunks.push(e.data);
recorder.onstop = () => {
const blob = new Blob(chunks, { type: 'audio/wav' });
const reader = new FileReader();
reader.onloadend = () => {
window.parent.postMessage({type: "streamlit:setComponentValue", value: reader.result}, "*");
};
reader.readAsDataURL(blob);
stream.getTracks().forEach(track => track.stop());
};
recorder.start();
document.getElementById('record-btn').textContent = 'Stop Recording';
} catch (e) { alert('Recording failed: ' + e.message); }
}
function stopRecording() {
recorder.stop();
document.getElementById('record-btn').textContent = 'Start Recording';
}
function toggleRecording() {
if (!recorder || recorder.state === 'inactive') startRecording();
else stopRecording();
}
</script>
<button id="record-btn" onclick="toggleRecording()">Start Recording</button>
"""
return components.html(audio_recorder_html, height=100)
# Display analysis results
def display_analysis_results(transcribed_text):
emotions_dict, top_emotion, emotion_map, sentiment = perform_emotion_detection(transcribed_text)
is_sarcastic, sarcasm_score = perform_sarcasm_detection(transcribed_text)
st.header("Results")
st.text_area("Transcribed Text", transcribed_text, height=100, disabled=True)
col1, col2 = st.columns(2)
with col1:
st.subheader("Sentiment")
st.write(f"{sentiment} ({top_emotion})")
with col2:
st.subheader("Sarcasm")
st.write(f"{'Detected' if is_sarcastic else 'Not Detected'} (Score: {sarcasm_score:.2f})")
if emotions_dict:
fig = px.bar(x=list(emotions_dict.keys()), y=list(emotions_dict.values()), labels={'x': 'Emotion', 'y': 'Score'})
st.plotly_chart(fig)
# Process base64 audio
def process_base64_audio(base64_data):
temp_file_path = None
try:
audio_bytes = base64.b64decode(base64_data.split(',')[1])
temp_file_path = os.path.join(tempfile.gettempdir(), f"rec_{int(time.time())}.wav")
with open(temp_file_path, "wb") as f:
f.write(audio_bytes)
if not validate_audio(temp_file_path):
return None
return temp_file_path
except Exception as e:
st.error(f"Error processing recorded audio: {str(e)}")
return None
finally:
if temp_file_path and os.path.exists(temp_file_path):
os.remove(temp_file_path)
# Main App Logic
def main():
tab1, tab2 = st.tabs(["Upload Audio", "Record Audio"])
with tab1:
audio_file = st.file_uploader("Upload Audio", type=["wav", "mp3", "ogg"])
if audio_file:
st.audio(audio_file)
if st.button("Analyze Uploaded Audio"):
with st.spinner("Analyzing..."):
temp_path = process_uploaded_audio(audio_file)
if temp_path:
text = transcribe_audio(temp_path)
if text:
display_analysis_results(text)
with tab2:
audio_data = custom_audio_recorder()
if audio_data and st.button("Analyze Recorded Audio"):
with st.spinner("Analyzing..."):
temp_path = process_base64_audio(audio_data)
if temp_path:
text = transcribe_audio(temp_path)
if text:
display_analysis_results(text)
show_model_info()
if __name__ == "__main__":
main() |