|
import os |
|
import streamlit as st |
|
import tempfile |
|
import torch |
|
import transformers |
|
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer, Wav2Vec2Processor, Wav2Vec2ForSequenceClassification |
|
import plotly.express as px |
|
import logging |
|
import warnings |
|
import whisper |
|
from pydub import AudioSegment |
|
import time |
|
import numpy as np |
|
import librosa |
|
import subprocess |
|
import pyaudio |
|
import wave |
|
import io |
|
|
|
|
|
logging.getLogger("torch").setLevel(logging.CRITICAL) |
|
logging.getLogger("transformers").setLevel(logging.CRITICAL) |
|
warnings.filterwarnings("ignore") |
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
print(f"Using device: {device}") |
|
|
|
|
|
st.set_page_config(layout="wide", page_title="Advanced Voice Emotion Analyzer") |
|
|
|
|
|
st.title("ποΈ Advanced Voice Emotion Analyzer") |
|
st.write("Analyze all emotions from audio using hybrid ML models, ensuring accurate detection across 27 emotions.") |
|
|
|
|
|
def make_audio_scarier(audio_path, output_path): |
|
try: |
|
commands = [ |
|
f"ffmpeg -i {audio_path} -af 'asetrate=44100*0.8,aresample=44100' temp1.wav", |
|
f"ffmpeg -i temp1.wav -af 'reverb=0.8:0.2:0.5:0.5:0.5:0.5' temp2.wav", |
|
f"ffmpeg -i temp2.wav -af 'atempo=1.2' {output_path}" |
|
] |
|
for cmd in commands: |
|
subprocess.run(cmd, shell=True, check=True) |
|
for temp_file in ["temp1.wav", "temp2.wav"]: |
|
if os.path.exists(temp_file): |
|
os.remove(temp_file) |
|
except Exception as e: |
|
st.error(f"Audio processing failed: {str(e)}") |
|
raise |
|
|
|
|
|
def extract_audio_features(audio_path): |
|
try: |
|
y, sr = librosa.load(audio_path, sr=16000) |
|
pitch_mean = np.mean(librosa.piptrack(y=y, sr=sr)[0][librosa.piptrack(y=y, sr=sr)[0] > 0]) if np.any(librosa.piptrack(y=y, sr=sr)[0] > 0) else 0 |
|
energy_mean = np.mean(librosa.feature.rms(y=y)) |
|
zcr_mean = np.mean(librosa.feature.zero_crossing_rate(y)) |
|
return {"pitch_mean": pitch_mean, "energy_mean": energy_mean, "zcr_mean": zcr_mean} |
|
except Exception as e: |
|
st.error(f"Audio feature extraction failed: {str(e)}") |
|
return {} |
|
|
|
|
|
@st.cache_resource |
|
def get_audio_emotion_classifier(): |
|
processor = Wav2Vec2Processor.from_pretrained("superb/wav2vec2-base-superb-er") |
|
model = Wav2Vec2ForSequenceClassification.from_pretrained("superb/wav2vec2-base-superb-er") |
|
model = model.to(device) |
|
return processor, model |
|
|
|
def perform_audio_emotion_detection(audio_path): |
|
try: |
|
processor, model = get_audio_emotion_classifier() |
|
waveform, sample_rate = librosa.load(audio_path, sr=16000) |
|
inputs = processor(waveform, sampling_rate=16000, return_tensors="pt", padding=True) |
|
inputs = {k: v.to(device) for k, v in inputs.items()} |
|
with torch.no_grad(): |
|
logits = model(**inputs).logits |
|
scores = torch.softmax(logits, dim=1).detach().cpu().numpy()[0] |
|
audio_emotions = ["neutral", "happy", "sad", "angry", "fearful", "surprise", "disgust"] |
|
emotion_dict = {emotion: float(scores[i]) for i, emotion in enumerate(audio_emotions)} |
|
top_emotion = audio_emotions[np.argmax(scores)] |
|
|
|
features = extract_audio_features(audio_path) |
|
if features.get("pitch_mean", 0) < 200 and features.get("energy_mean", 0) > 0.1 and features.get("zcr_mean", 0) > 0.1: |
|
emotion_dict["fearful"] = min(1.0, emotion_dict.get("fearful", 0) + 0.3) |
|
top_emotion = "fearful" if emotion_dict["fearful"] > emotion_dict[top_emotion] else top_emotion |
|
elif features.get("energy_mean", 0) > 0.2: |
|
emotion_dict["angry"] = min(1.0, emotion_dict.get("angry", 0) + 0.2) |
|
top_emotion = "angry" if emotion_dict["angry"] > emotion_dict[top_emotion] else top_emotion |
|
return emotion_dict, top_emotion |
|
except Exception as e: |
|
st.error(f"Audio emotion detection failed: {str(e)}") |
|
return {}, "unknown" |
|
|
|
|
|
@st.cache_resource |
|
def get_text_emotion_classifier(): |
|
tokenizer = AutoTokenizer.from_pretrained("SamLowe/roberta-base-go_emotions", use_fast=True) |
|
model = AutoModelForSequenceClassification.from_pretrained("SamLowe/roberta-base-go_emotions") |
|
model = model.to(device) |
|
return pipeline("text-classification", model=model, tokenizer=tokenizer, top_k=None, device=-1 if device.type == "cpu" else 0) |
|
|
|
def perform_text_emotion_detection(text): |
|
try: |
|
classifier = get_text_emotion_classifier() |
|
results = classifier(text)[0] |
|
emotions = ["admiration", "amusement", "anger", "annoyance", "approval", "caring", "confusion", |
|
"curiosity", "desire", "disappointment", "disapproval", "disgust", "embarrassment", |
|
"excitement", "fear", "gratitude", "grief", "joy", "love", "nervousness", "optimism", |
|
"pride", "realization", "relief", "remorse", "sadness", "surprise", "neutral"] |
|
emotions_dict = {result['label']: result['score'] for result in results if result['label'] in emotions} |
|
top_emotion = max(emotions_dict, key=emotions_dict.get) |
|
return emotions_dict, top_emotion |
|
except Exception as e: |
|
st.error(f"Text emotion detection failed: {str(e)}") |
|
return {}, "unknown" |
|
|
|
|
|
@st.cache_resource |
|
def get_sarcasm_classifier(): |
|
tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-irony", use_fast=True) |
|
model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-irony") |
|
model = model.to(device) |
|
return pipeline("text-classification", model=model, tokenizer=tokenizer, device=-1 if device.type == "cpu" else 0) |
|
|
|
def perform_sarcasm_detection(text): |
|
try: |
|
classifier = get_sarcasm_classifier() |
|
result = classifier(text)[0] |
|
is_sarcastic = result['label'] == "LABEL_1" |
|
sarcasm_score = result['score'] if is_sarcastic else 1 - result['score'] |
|
return is_sarcastic, sarcasm_score |
|
except Exception as e: |
|
st.error(f"Sarcasm detection failed: {str(e)}") |
|
return False, 0.0 |
|
|
|
|
|
def validate_audio(audio_path): |
|
try: |
|
sound = AudioSegment.from_file(audio_path) |
|
if sound.dBFS < -50 or len(sound) < 1000: |
|
st.warning("Audio volume too low or too short. Please use a louder, longer audio.") |
|
return False |
|
return True |
|
except Exception: |
|
st.error("Invalid or corrupted audio file.") |
|
return False |
|
|
|
|
|
@st.cache_resource |
|
def load_whisper_model(): |
|
return whisper.load_model("large-v3") |
|
|
|
def transcribe_audio(audio_path): |
|
try: |
|
sound = AudioSegment.from_file(audio_path) |
|
temp_wav_path = os.path.join(tempfile.gettempdir(), "temp_converted.wav") |
|
sound = sound.set_frame_rate(16000).set_channels(1) |
|
sound.export(temp_wav_path, format="wav") |
|
model = load_whisper_model() |
|
result = model.transcribe(temp_wav_path, language="en") |
|
os.remove(temp_wav_path) |
|
return result["text"].strip() |
|
except Exception as e: |
|
st.error(f"Transcription failed: {str(e)}") |
|
return "" |
|
|
|
|
|
def record_audio(): |
|
CHUNK = 1024 |
|
FORMAT = pyaudio.paInt16 |
|
CHANNELS = 1 |
|
RATE = 16000 |
|
RECORD_SECONDS = st.slider("Recording duration (seconds)", 1, 30, 5) |
|
|
|
p = pyaudio.PyAudio() |
|
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) |
|
|
|
if st.button("Start Recording"): |
|
st.write("Recording...") |
|
frames = [] |
|
for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS)): |
|
data = stream.read(CHUNK) |
|
frames.append(data) |
|
st.write("Recording finished.") |
|
|
|
stream.stop_stream() |
|
stream.close() |
|
p.terminate() |
|
|
|
temp_file_path = os.path.join(tempfile.gettempdir(), f"recorded_audio_{int(time.time())}.wav") |
|
wf = wave.open(temp_file_path, 'wb') |
|
wf.setnchannels(CHANNELS) |
|
wf.setsampwidth(p.get_sample_size(FORMAT)) |
|
wf.setframerate(RATE) |
|
wf.writeframes(b''.join(frames)) |
|
wf.close() |
|
|
|
return temp_file_path |
|
return None |
|
|
|
|
|
def process_audio_file(audio_data): |
|
temp_dir = tempfile.gettempdir() |
|
temp_file_path = os.path.join(temp_dir, f"audio_{int(time.time())}.wav") |
|
with open(temp_file_path, "wb") as f: |
|
if isinstance(audio_data, str): |
|
with open(audio_data, "rb") as f_audio: |
|
f.write(f_audio.read()) |
|
else: |
|
f.write(audio_data.getvalue()) |
|
if not validate_audio(temp_file_path): |
|
return None |
|
return temp_file_path |
|
|
|
|
|
def display_analysis_results(audio_path): |
|
st.header("Audio Analysis") |
|
st.audio(audio_path) |
|
|
|
|
|
processed_audio_path = os.path.join(tempfile.gettempdir(), f"processed_{int(time.time())}.wav") |
|
make_audio_scarier(audio_path, processed_audio_path) |
|
|
|
|
|
audio_emotions, audio_top_emotion = perform_audio_emotion_detection(processed_audio_path) |
|
st.subheader("Audio-Based Emotion") |
|
st.write(f"**Dominant Emotion:** {audio_top_emotion} (Score: {audio_emotions.get(audio_top_emotion, 0):.3f})") |
|
|
|
|
|
transcribed_text = transcribe_audio(processed_audio_path) |
|
st.subheader("Transcribed Text") |
|
st.text_area("Text", transcribed_text, height=100, disabled=True) |
|
if transcribed_text: |
|
text_emotions, text_top_emotion = perform_text_emotion_detection(transcribed_text) |
|
st.write(f"**Text-Based Dominant Emotion:** {text_top_emotion} (Score: {text_emotions.get(text_top_emotion, 0):.3f})") |
|
|
|
|
|
emotion_map = { |
|
"neutral": "neutral", "happy": "joy", "sad": "sadness", "angry": "anger", |
|
"fearful": "fear", "surprise": "surprise", "disgust": "disgust" |
|
} |
|
combined_emotions = {emotion: 0 for emotion in ["admiration", "amusement", "anger", "annoyance", "approval", "caring", |
|
"confusion", "curiosity", "desire", "disappointment", "disapproval", |
|
"disgust", "embarrassment", "excitement", "fear", "gratitude", |
|
"grief", "joy", "love", "nervousness", "optimism", "pride", |
|
"realization", "relief", "remorse", "sadness", "surprise", "neutral"]} |
|
for audio_emotion, score in audio_emotions.items(): |
|
mapped_emotion = emotion_map.get(audio_emotion, "neutral") |
|
combined_emotions[mapped_emotion] = max(combined_emotions[mapped_emotion], score * 0.7) |
|
if transcribed_text: |
|
for text_emotion, score in text_emotions.items(): |
|
combined_emotions[text_emotion] = combined_emotions.get(text_emotion, 0) + score * 0.3 |
|
|
|
top_emotion = max(combined_emotions, key=combined_emotions.get) |
|
sentiment = "POSITIVE" if top_emotion in ["admiration", "amusement", "approval", "caring", "desire", "excitement", |
|
"gratitude", "joy", "love", "optimism", "pride", "relief"] else "NEGATIVE" if top_emotion in ["anger", "annoyance", "disappointment", "disapproval", "disgust", "embarrassment", "fear", "grief", "nervousness", "remorse", "sadness"] else "NEUTRAL" |
|
|
|
|
|
is_sarcastic, sarcasm_score = perform_sarcasm_detection(transcribed_text) if transcribed_text else (False, 0.0) |
|
|
|
|
|
col1, col2 = st.columns([1, 2]) |
|
with col1: |
|
st.subheader("Sentiment") |
|
sentiment_icon = "π" if sentiment == "POSITIVE" else "π" if sentiment == "NEGATIVE" else "π" |
|
st.markdown(f"**{sentiment_icon} {sentiment.capitalize()}** (Based on {top_emotion})") |
|
st.subheader("Sarcasm") |
|
sarcasm_icon = "π" if is_sarcastic else "π" |
|
st.markdown(f"**{sarcasm_icon} {'Detected' if is_sarcastic else 'Not Detected'}** (Score: {sarcasm_score:.3f})") |
|
|
|
with col2: |
|
st.subheader("Emotion Distribution") |
|
sorted_emotions = sorted(combined_emotions.items(), key=lambda x: x[1], reverse=True)[:10] |
|
emotions, scores = zip(*sorted_emotions) |
|
fig = px.bar(x=list(emotions), y=list(scores), labels={'x': 'Emotion', 'y': 'Score'}, |
|
title="Top Emotion Scores", color=list(emotions), |
|
color_discrete_sequence=px.colors.qualitative.Bold) |
|
fig.update_layout(yaxis_range=[0, 1], showlegend=False, title_font_size=14) |
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
with st.expander("Details"): |
|
st.write(f"**Audio Features:** {extract_audio_features(processed_audio_path)}") |
|
st.write(""" |
|
**How it works:** |
|
- Audio Emotion: Wav2Vec2 detects 7 emotions from audio. |
|
- Transcription: Whisper converts audio to text. |
|
- Text Emotion: RoBERTa refines 27 emotions from text. |
|
- Sarcasm: Analyzes text for irony. |
|
**Accuracy depends on:** Audio quality, clarity, and noise. |
|
""") |
|
|
|
|
|
for path in [audio_path, processed_audio_path]: |
|
if os.path.exists(path): |
|
os.remove(path) |
|
|
|
|
|
def main(): |
|
tab1, tab2 = st.tabs(["π Upload Audio", "ποΈ Record Audio"]) |
|
|
|
with tab1: |
|
st.header("Upload Audio File") |
|
audio_file = st.file_uploader("Upload audio (wav, mp3, ogg)", type=["wav", "mp3", "ogg"]) |
|
if audio_file: |
|
temp_audio_path = process_audio_file(audio_file) |
|
if temp_audio_path: |
|
if st.button("Analyze Upload"): |
|
with st.spinner("Analyzing..."): |
|
display_analysis_results(temp_audio_path) |
|
|
|
with tab2: |
|
st.header("Record Your Voice") |
|
st.write("Record audio to analyze emotions in real-time.") |
|
temp_audio_path = record_audio() |
|
if temp_audio_path: |
|
if st.button("Analyze Recording"): |
|
with st.spinner("Processing..."): |
|
display_analysis_results(temp_audio_path) |
|
|
|
st.sidebar.header("About") |
|
st.sidebar.write(""" |
|
**Models Used:** |
|
- Audio: superb/wav2vec2-base-superb-er (7 emotions) |
|
- Text: SamLowe/roberta-base-go_emotions (27 emotions) |
|
- Sarcasm: cardiffnlp/twitter-roberta-base-irony |
|
- Speech: OpenAI Whisper (large-v3) |
|
""") |
|
|
|
if __name__ == "__main__": |
|
main() |