yunusajib's picture
app update
3253f05 verified
raw
history blame
21.6 kB
import gradio as gr
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from datetime import datetime, timedelta
import threading
import queue
import time
from collections import deque
import warnings
warnings.filterwarnings("ignore")
# Try to import OpenCV with fallback
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
print("OpenCV not available - using PIL for image processing")
# Try to import librosa with fallback
try:
import librosa
LIBROSA_AVAILABLE = True
except ImportError:
LIBROSA_AVAILABLE = False
print("Librosa not available - using basic audio processing")
# Try to import transformers and torch, with fallbacks
try:
from transformers import pipeline
import torch
HF_AVAILABLE = True
except ImportError:
HF_AVAILABLE = False
print("Transformers not available - using mock emotion detection")
# Additional imports for image processing if OpenCV fails
try:
from PIL import Image, ImageDraw, ImageFont
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
class EmotionRecognitionSystem:
def __init__(self):
self.emotion_history = deque(maxlen=100) # Store last 100 emotion readings
self.audio_queue = queue.Queue()
self.video_queue = queue.Queue()
# Initialize emotion detection models
self.setup_models()
# Emotion thresholds for alerts
self.alert_thresholds = {
'stress': 0.7,
'anxiety': 0.6,
'pain': 0.8,
'confusion': 0.5
}
def setup_models(self):
"""Initialize emotion recognition models"""
if HF_AVAILABLE:
try:
# Facial emotion recognition
self.face_emotion_pipeline = pipeline(
"image-classification",
model="j-hartmann/emotion-english-distilroberta-base",
device=0 if torch.cuda.is_available() else -1
)
# Audio emotion recognition
self.audio_emotion_pipeline = pipeline(
"audio-classification",
model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition",
device=0 if torch.cuda.is_available() else -1
)
self.models_loaded = True
except Exception as e:
print(f"Error loading models: {e}")
self.models_loaded = False
else:
self.models_loaded = False
def detect_face_emotion(self, frame):
"""Detect emotions from facial expressions"""
if not self.models_loaded:
# Mock emotion detection for demo
emotions = ['neutral', 'happy', 'sad', 'angry', 'fear', 'surprise', 'disgust']
scores = np.random.dirichlet(np.ones(len(emotions)))
return dict(zip(emotions, scores))
try:
# Handle different image formats
if isinstance(frame, np.ndarray):
if CV2_AVAILABLE:
# Convert frame to RGB if it's BGR
if len(frame.shape) == 3 and frame.shape[2] == 3:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
else:
rgb_frame = frame
else:
# Use numpy operations for color conversion
if len(frame.shape) == 3 and frame.shape[2] == 3:
rgb_frame = frame[:, :, ::-1] # BGR to RGB
else:
rgb_frame = frame
else:
rgb_frame = frame
# Use face emotion model
results = self.face_emotion_pipeline(rgb_frame)
# Convert to standardized format
emotion_scores = {}
for result in results:
emotion_scores[result['label'].lower()] = result['score']
return emotion_scores
except Exception as e:
print(f"Face emotion detection error: {e}")
return {'neutral': 1.0}
def detect_voice_emotion(self, audio_data, sample_rate=16000):
"""Detect emotions from voice tone"""
if not self.models_loaded or audio_data is None:
# Mock emotion detection
emotions = ['neutral', 'happy', 'sad', 'angry', 'fear']
scores = np.random.dirichlet(np.ones(len(emotions)))
return dict(zip(emotions, scores))
try:
# Process audio with the model
results = self.audio_emotion_pipeline(audio_data)
emotion_scores = {}
for result in results:
emotion_scores[result['label'].lower()] = result['score']
return emotion_scores
except Exception as e:
print(f"Voice emotion detection error: {e}")
return {'neutral': 1.0}
def extract_audio_features(self, audio_data, sample_rate):
"""Extract audio features for emotion analysis"""
if not LIBROSA_AVAILABLE:
# Return mock features if librosa is not available
return {
'mfcc_mean': np.random.random(),
'mfcc_std': np.random.random(),
'spectral_centroid_mean': np.random.random(),
'zcr_mean': np.random.random(),
'spectral_rolloff_mean': np.random.random()
}
try:
# Extract basic audio features
mfccs = librosa.feature.mfcc(y=audio_data, sr=sample_rate, n_mfcc=13)
spectral_centroids = librosa.feature.spectral_centroid(y=audio_data, sr=sample_rate)
zero_crossing_rate = librosa.feature.zero_crossing_rate(audio_data)
spectral_rolloff = librosa.feature.spectral_rolloff(y=audio_data, sr=sample_rate)
features = {
'mfcc_mean': np.mean(mfccs),
'mfcc_std': np.std(mfccs),
'spectral_centroid_mean': np.mean(spectral_centroids),
'zcr_mean': np.mean(zero_crossing_rate),
'spectral_rolloff_mean': np.mean(spectral_rolloff)
}
return features
except Exception as e:
print(f"Audio feature extraction error: {e}")
return {}
def combine_emotions(self, face_emotions, voice_emotions, weights=(0.6, 0.4)):
"""Combine facial and voice emotion predictions"""
combined = {}
all_emotions = set(face_emotions.keys()) | set(voice_emotions.keys())
for emotion in all_emotions:
face_score = face_emotions.get(emotion, 0)
voice_score = voice_emotions.get(emotion, 0)
combined[emotion] = weights[0] * face_score + weights[1] * voice_score
return combined
def map_to_clinical_emotions(self, emotions):
"""Map detected emotions to clinical categories"""
clinical_mapping = {
'stress': emotions.get('angry', 0) * 0.3 + emotions.get('fear', 0) * 0.4 + emotions.get('disgust', 0) * 0.3,
'anxiety': emotions.get('fear', 0) * 0.6 + emotions.get('surprise', 0) * 0.2 + emotions.get('sad', 0) * 0.2,
'pain': emotions.get('angry', 0) * 0.4 + emotions.get('disgust', 0) * 0.3 + emotions.get('sad', 0) * 0.3,
'confusion': emotions.get('surprise', 0) * 0.5 + emotions.get('neutral', 0) * 0.3 + emotions.get('fear', 0) * 0.2,
'comfort': emotions.get('happy', 0) * 0.7 + emotions.get('neutral', 0) * 0.3
}
return clinical_mapping
def generate_alerts(self, clinical_emotions):
"""Generate alerts based on emotion thresholds"""
alerts = []
suggestions = []
for emotion, score in clinical_emotions.items():
if emotion in self.alert_thresholds and score > self.alert_thresholds[emotion]:
alerts.append(f"⚠️ High {emotion} detected ({score:.2f})")
# Add specific suggestions
if emotion == 'stress':
suggestions.append("Consider: Take a moment to slow down, use calming voice tone")
elif emotion == 'anxiety':
suggestions.append("Consider: Provide reassurance, explain procedures clearly")
elif emotion == 'pain':
suggestions.append("Consider: Assess pain level, offer comfort measures")
elif emotion == 'confusion':
suggestions.append("Consider: Simplify explanations, check understanding")
return alerts, suggestions
def process_frame(self, frame, audio_data=None, sample_rate=16000):
"""Process a single frame and audio data"""
timestamp = datetime.now()
# Detect emotions
face_emotions = self.detect_face_emotion(frame)
voice_emotions = self.detect_voice_emotion(audio_data, sample_rate) if audio_data is not None else {}
# Combine emotions
if voice_emotions:
combined_emotions = self.combine_emotions(face_emotions, voice_emotions)
else:
combined_emotions = face_emotions
# Map to clinical categories
clinical_emotions = self.map_to_clinical_emotions(combined_emotions)
# Generate alerts
alerts, suggestions = self.generate_alerts(clinical_emotions)
# Store in history
emotion_record = {
'timestamp': timestamp,
'face_emotions': face_emotions,
'voice_emotions': voice_emotions,
'clinical_emotions': clinical_emotions,
'alerts': alerts,
'suggestions': suggestions
}
self.emotion_history.append(emotion_record)
return emotion_record
# Initialize the emotion recognition system
emotion_system = EmotionRecognitionSystem()
def process_video_audio(video_frame, audio_data):
"""Process video frame and audio data"""
if video_frame is None:
return None, "No video input", "", ""
# Process the frame
sample_rate = 16000
if audio_data is not None:
audio_array, sr = audio_data
if LIBROSA_AVAILABLE and sr != sample_rate:
audio_array = librosa.resample(audio_array, orig_sr=sr, target_sr=sample_rate)
elif not LIBROSA_AVAILABLE:
# Simple resampling if librosa not available
if sr != sample_rate:
# Basic downsampling
step = sr // sample_rate
audio_array = audio_array[::step] if step > 1 else audio_array
else:
audio_array = None
# Get emotion analysis
emotion_record = emotion_system.process_frame(video_frame, audio_array, sample_rate)
# Create visualization
annotated_frame = create_emotion_overlay(video_frame, emotion_record)
# Format results
clinical_text = format_clinical_emotions(emotion_record['clinical_emotions'])
alerts_text = "\n".join(emotion_record['alerts']) if emotion_record['alerts'] else "No alerts"
suggestions_text = "\n".join(emotion_record['suggestions']) if emotion_record['suggestions'] else "No suggestions"
return annotated_frame, clinical_text, alerts_text, suggestions_text
def create_emotion_overlay(frame, emotion_record):
"""Add emotion information overlay to video frame"""
try:
if CV2_AVAILABLE:
annotated_frame = frame.copy()
# Get top emotion
clinical_emotions = emotion_record['clinical_emotions']
top_emotion = max(clinical_emotions.items(), key=lambda x: x[1])
# Add text overlay
cv2.putText(annotated_frame, f"Primary: {top_emotion[0]} ({top_emotion[1]:.2f})",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# Add alert indicator
if emotion_record['alerts']:
cv2.putText(annotated_frame, "ALERT!", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
return annotated_frame
elif PIL_AVAILABLE:
# Use PIL for image annotation
pil_image = Image.fromarray(frame)
draw = ImageDraw.Draw(pil_image)
# Get top emotion
clinical_emotions = emotion_record['clinical_emotions']
top_emotion = max(clinical_emotions.items(), key=lambda x: x[1])
# Add text overlay
try:
font = ImageFont.load_default()
except:
font = None
text = f"Primary: {top_emotion[0]} ({top_emotion[1]:.2f})"
draw.text((10, 10), text, fill=(0, 255, 0), font=font)
# Add alert indicator
if emotion_record['alerts']:
draw.text((10, 40), "ALERT!", fill=(255, 0, 0), font=font)
return np.array(pil_image)
else:
# Return original frame if no image processing available
return frame
except Exception as e:
print(f"Error creating emotion overlay: {e}")
return frame
def format_clinical_emotions(clinical_emotions):
"""Format clinical emotions for display"""
formatted = []
for emotion, score in clinical_emotions.items():
bar = "β–ˆ" * int(score * 10)
formatted.append(f"{emotion.capitalize()}: {bar} {score:.3f}")
return "\n".join(formatted)
def create_emotion_timeline():
"""Create emotion timeline chart"""
if not emotion_system.emotion_history:
return create_empty_chart()
# Extract data for plotting
timestamps = [record['timestamp'] for record in emotion_system.emotion_history]
fig = go.Figure()
# Add traces for each clinical emotion
clinical_emotions = ['stress', 'anxiety', 'pain', 'confusion', 'comfort']
colors = ['red', 'orange', 'purple', 'brown', 'green']
for emotion, color in zip(clinical_emotions, colors):
values = [record['clinical_emotions'].get(emotion, 0) for record in emotion_system.emotion_history]
fig.add_trace(go.Scatter(
x=timestamps,
y=values,
mode='lines+markers',
name=emotion.capitalize(),
line=dict(color=color, width=2),
marker=dict(size=4)
))
fig.update_layout(
title="Patient Emotion Timeline",
xaxis_title="Time",
yaxis_title="Emotion Intensity",
height=400,
showlegend=True,
template="plotly_white"
)
return fig
def create_empty_chart():
"""Create empty chart when no data available"""
fig = go.Figure()
fig.add_annotation(
text="No emotion data available yet",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False, font=dict(size=16)
)
fig.update_layout(
title="Patient Emotion Timeline",
height=400,
template="plotly_white"
)
return fig
def get_session_summary():
"""Generate session summary"""
if not emotion_system.emotion_history:
return "No session data available"
# Calculate averages
avg_emotions = {}
total_alerts = 0
for emotion in ['stress', 'anxiety', 'pain', 'confusion', 'comfort']:
values = [record['clinical_emotions'].get(emotion, 0) for record in emotion_system.emotion_history]
avg_emotions[emotion] = np.mean(values) if values else 0
total_alerts = sum(len(record['alerts']) for record in emotion_system.emotion_history)
# Format summary
summary = f"""
Session Summary:
- Duration: {len(emotion_system.emotion_history)} readings
- Average Stress Level: {avg_emotions['stress']:.3f}
- Average Anxiety Level: {avg_emotions['anxiety']:.3f}
- Average Pain Level: {avg_emotions['pain']:.3f}
- Average Confusion Level: {avg_emotions['confusion']:.3f}
- Average Comfort Level: {avg_emotions['comfort']:.3f}
- Total Alerts: {total_alerts}
Recommendations:
- Monitor stress levels during consultation
- Ensure patient understanding and comfort
- Address any recurring high emotion levels
"""
return summary
def clear_session():
"""Clear session data"""
emotion_system.emotion_history.clear()
return "Session data cleared", create_empty_chart(), ""
# Create Gradio interface
def create_interface():
with gr.Blocks(title="Patient Emotion Recognition System", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸ₯ Real-Time Patient Emotion Recognition System
This system analyzes patient facial expressions and voice tone during consultations to detect emotions such as stress, anxiety, confusion, or pain.
""")
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("### πŸ“Ή Live Analysis")
# Video input
video_input = gr.Video(
label="Video Feed",
sources=["webcam"],
streaming=True
)
# Audio input
audio_input = gr.Audio(
label="Audio Input",
sources=["microphone"],
type="numpy",
streaming=True
)
# Process button
process_btn = gr.Button("πŸ”„ Process Current Frame", variant="primary")
with gr.Column(scale=2):
gr.Markdown("### πŸ“Š Real-Time Results")
# Annotated video output
video_output = gr.Image(
label="Emotion Analysis",
type="numpy"
)
# Clinical emotions display
clinical_output = gr.Textbox(
label="Clinical Emotion Levels",
lines=6,
interactive=False
)
with gr.Row():
with gr.Column():
gr.Markdown("### ⚠️ Alerts")
alerts_output = gr.Textbox(
label="Current Alerts",
lines=3,
interactive=False
)
with gr.Column():
gr.Markdown("### πŸ’‘ Suggestions")
suggestions_output = gr.Textbox(
label="Practitioner Suggestions",
lines=3,
interactive=False
)
with gr.Row():
gr.Markdown("### πŸ“ˆ Emotion Timeline")
timeline_plot = gr.Plot(label="Emotion Timeline")
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ“‹ Session Summary")
summary_output = gr.Textbox(
label="Session Summary",
lines=12,
interactive=False
)
with gr.Row():
update_summary_btn = gr.Button("πŸ“Š Update Summary")
clear_btn = gr.Button("πŸ—‘οΈ Clear Session", variant="secondary")
update_timeline_btn = gr.Button("πŸ”„ Update Timeline")
# Event handlers
process_btn.click(
fn=process_video_audio,
inputs=[video_input, audio_input],
outputs=[video_output, clinical_output, alerts_output, suggestions_output]
)
update_timeline_btn.click(
fn=create_emotion_timeline,
outputs=timeline_plot
)
update_summary_btn.click(
fn=get_session_summary,
outputs=summary_output
)
clear_btn.click(
fn=clear_session,
outputs=[summary_output, timeline_plot, clinical_output]
)
# Auto-update timeline every few seconds
demo.load(fn=create_emotion_timeline, outputs=timeline_plot)
gr.Markdown("""
### πŸ“ Usage Instructions:
1. **Enable camera and microphone** access when prompted
2. **Click "Process Current Frame"** to analyze emotions in real-time
3. **Monitor the timeline** to track emotion changes over time
4. **Review alerts and suggestions** for patient care recommendations
5. **Use session summary** for consultation documentation
### πŸ”§ Technical Notes:
- System uses pre-trained emotion recognition models
- Combines facial expression and voice tone analysis
- Provides clinical emotion mapping (stress, anxiety, pain, confusion)
- Generates real-time alerts and suggestions for practitioners
""")
return demo
# Launch the application
if __name__ == "__main__":
demo = create_interface()
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)