|
import gradio as gr |
|
import numpy as np |
|
import pandas as pd |
|
import plotly.graph_objects as go |
|
import plotly.express as px |
|
from datetime import datetime, timedelta |
|
import threading |
|
import queue |
|
import time |
|
from collections import deque |
|
import warnings |
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
try: |
|
import cv2 |
|
CV2_AVAILABLE = True |
|
except ImportError: |
|
CV2_AVAILABLE = False |
|
print("OpenCV not available - using PIL for image processing") |
|
|
|
|
|
try: |
|
import librosa |
|
LIBROSA_AVAILABLE = True |
|
except ImportError: |
|
LIBROSA_AVAILABLE = False |
|
print("Librosa not available - using basic audio processing") |
|
|
|
|
|
try: |
|
from transformers import pipeline |
|
import torch |
|
HF_AVAILABLE = True |
|
except ImportError: |
|
HF_AVAILABLE = False |
|
print("Transformers not available - using mock emotion detection") |
|
|
|
|
|
try: |
|
from PIL import Image, ImageDraw, ImageFont |
|
PIL_AVAILABLE = True |
|
except ImportError: |
|
PIL_AVAILABLE = False |
|
|
|
class EmotionRecognitionSystem: |
|
def __init__(self): |
|
self.emotion_history = deque(maxlen=100) |
|
self.audio_queue = queue.Queue() |
|
self.video_queue = queue.Queue() |
|
|
|
|
|
self.setup_models() |
|
|
|
|
|
self.alert_thresholds = { |
|
'stress': 0.7, |
|
'anxiety': 0.6, |
|
'pain': 0.8, |
|
'confusion': 0.5 |
|
} |
|
|
|
def setup_models(self): |
|
"""Initialize emotion recognition models""" |
|
if HF_AVAILABLE: |
|
try: |
|
|
|
self.face_emotion_pipeline = pipeline( |
|
"image-classification", |
|
model="j-hartmann/emotion-english-distilroberta-base", |
|
device=0 if torch.cuda.is_available() else -1 |
|
) |
|
|
|
|
|
self.audio_emotion_pipeline = pipeline( |
|
"audio-classification", |
|
model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition", |
|
device=0 if torch.cuda.is_available() else -1 |
|
) |
|
self.models_loaded = True |
|
except Exception as e: |
|
print(f"Error loading models: {e}") |
|
self.models_loaded = False |
|
else: |
|
self.models_loaded = False |
|
|
|
def detect_face_emotion(self, frame): |
|
"""Detect emotions from facial expressions""" |
|
if not self.models_loaded: |
|
|
|
emotions = ['neutral', 'happy', 'sad', 'angry', 'fear', 'surprise', 'disgust'] |
|
scores = np.random.dirichlet(np.ones(len(emotions))) |
|
return dict(zip(emotions, scores)) |
|
|
|
try: |
|
|
|
if isinstance(frame, np.ndarray): |
|
if CV2_AVAILABLE: |
|
|
|
if len(frame.shape) == 3 and frame.shape[2] == 3: |
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
else: |
|
rgb_frame = frame |
|
else: |
|
|
|
if len(frame.shape) == 3 and frame.shape[2] == 3: |
|
rgb_frame = frame[:, :, ::-1] |
|
else: |
|
rgb_frame = frame |
|
else: |
|
rgb_frame = frame |
|
|
|
|
|
results = self.face_emotion_pipeline(rgb_frame) |
|
|
|
|
|
emotion_scores = {} |
|
for result in results: |
|
emotion_scores[result['label'].lower()] = result['score'] |
|
|
|
return emotion_scores |
|
|
|
except Exception as e: |
|
print(f"Face emotion detection error: {e}") |
|
return {'neutral': 1.0} |
|
|
|
def detect_voice_emotion(self, audio_data, sample_rate=16000): |
|
"""Detect emotions from voice tone""" |
|
if not self.models_loaded or audio_data is None: |
|
|
|
emotions = ['neutral', 'happy', 'sad', 'angry', 'fear'] |
|
scores = np.random.dirichlet(np.ones(len(emotions))) |
|
return dict(zip(emotions, scores)) |
|
|
|
try: |
|
|
|
results = self.audio_emotion_pipeline(audio_data) |
|
|
|
emotion_scores = {} |
|
for result in results: |
|
emotion_scores[result['label'].lower()] = result['score'] |
|
|
|
return emotion_scores |
|
|
|
except Exception as e: |
|
print(f"Voice emotion detection error: {e}") |
|
return {'neutral': 1.0} |
|
|
|
def extract_audio_features(self, audio_data, sample_rate): |
|
"""Extract audio features for emotion analysis""" |
|
if not LIBROSA_AVAILABLE: |
|
|
|
return { |
|
'mfcc_mean': np.random.random(), |
|
'mfcc_std': np.random.random(), |
|
'spectral_centroid_mean': np.random.random(), |
|
'zcr_mean': np.random.random(), |
|
'spectral_rolloff_mean': np.random.random() |
|
} |
|
|
|
try: |
|
|
|
mfccs = librosa.feature.mfcc(y=audio_data, sr=sample_rate, n_mfcc=13) |
|
spectral_centroids = librosa.feature.spectral_centroid(y=audio_data, sr=sample_rate) |
|
zero_crossing_rate = librosa.feature.zero_crossing_rate(audio_data) |
|
spectral_rolloff = librosa.feature.spectral_rolloff(y=audio_data, sr=sample_rate) |
|
|
|
features = { |
|
'mfcc_mean': np.mean(mfccs), |
|
'mfcc_std': np.std(mfccs), |
|
'spectral_centroid_mean': np.mean(spectral_centroids), |
|
'zcr_mean': np.mean(zero_crossing_rate), |
|
'spectral_rolloff_mean': np.mean(spectral_rolloff) |
|
} |
|
|
|
return features |
|
except Exception as e: |
|
print(f"Audio feature extraction error: {e}") |
|
return {} |
|
|
|
def combine_emotions(self, face_emotions, voice_emotions, weights=(0.6, 0.4)): |
|
"""Combine facial and voice emotion predictions""" |
|
combined = {} |
|
all_emotions = set(face_emotions.keys()) | set(voice_emotions.keys()) |
|
|
|
for emotion in all_emotions: |
|
face_score = face_emotions.get(emotion, 0) |
|
voice_score = voice_emotions.get(emotion, 0) |
|
combined[emotion] = weights[0] * face_score + weights[1] * voice_score |
|
|
|
return combined |
|
|
|
def map_to_clinical_emotions(self, emotions): |
|
"""Map detected emotions to clinical categories""" |
|
clinical_mapping = { |
|
'stress': emotions.get('angry', 0) * 0.3 + emotions.get('fear', 0) * 0.4 + emotions.get('disgust', 0) * 0.3, |
|
'anxiety': emotions.get('fear', 0) * 0.6 + emotions.get('surprise', 0) * 0.2 + emotions.get('sad', 0) * 0.2, |
|
'pain': emotions.get('angry', 0) * 0.4 + emotions.get('disgust', 0) * 0.3 + emotions.get('sad', 0) * 0.3, |
|
'confusion': emotions.get('surprise', 0) * 0.5 + emotions.get('neutral', 0) * 0.3 + emotions.get('fear', 0) * 0.2, |
|
'comfort': emotions.get('happy', 0) * 0.7 + emotions.get('neutral', 0) * 0.3 |
|
} |
|
|
|
return clinical_mapping |
|
|
|
def generate_alerts(self, clinical_emotions): |
|
"""Generate alerts based on emotion thresholds""" |
|
alerts = [] |
|
suggestions = [] |
|
|
|
for emotion, score in clinical_emotions.items(): |
|
if emotion in self.alert_thresholds and score > self.alert_thresholds[emotion]: |
|
alerts.append(f"β οΈ High {emotion} detected ({score:.2f})") |
|
|
|
|
|
if emotion == 'stress': |
|
suggestions.append("Consider: Take a moment to slow down, use calming voice tone") |
|
elif emotion == 'anxiety': |
|
suggestions.append("Consider: Provide reassurance, explain procedures clearly") |
|
elif emotion == 'pain': |
|
suggestions.append("Consider: Assess pain level, offer comfort measures") |
|
elif emotion == 'confusion': |
|
suggestions.append("Consider: Simplify explanations, check understanding") |
|
|
|
return alerts, suggestions |
|
|
|
def process_frame(self, frame, audio_data=None, sample_rate=16000): |
|
"""Process a single frame and audio data""" |
|
timestamp = datetime.now() |
|
|
|
|
|
face_emotions = self.detect_face_emotion(frame) |
|
voice_emotions = self.detect_voice_emotion(audio_data, sample_rate) if audio_data is not None else {} |
|
|
|
|
|
if voice_emotions: |
|
combined_emotions = self.combine_emotions(face_emotions, voice_emotions) |
|
else: |
|
combined_emotions = face_emotions |
|
|
|
|
|
clinical_emotions = self.map_to_clinical_emotions(combined_emotions) |
|
|
|
|
|
alerts, suggestions = self.generate_alerts(clinical_emotions) |
|
|
|
|
|
emotion_record = { |
|
'timestamp': timestamp, |
|
'face_emotions': face_emotions, |
|
'voice_emotions': voice_emotions, |
|
'clinical_emotions': clinical_emotions, |
|
'alerts': alerts, |
|
'suggestions': suggestions |
|
} |
|
|
|
self.emotion_history.append(emotion_record) |
|
|
|
return emotion_record |
|
|
|
|
|
emotion_system = EmotionRecognitionSystem() |
|
|
|
def process_video_audio(video_frame, audio_data): |
|
"""Process video frame and audio data""" |
|
if video_frame is None: |
|
return None, "No video input", "", "" |
|
|
|
|
|
sample_rate = 16000 |
|
if audio_data is not None: |
|
audio_array, sr = audio_data |
|
if LIBROSA_AVAILABLE and sr != sample_rate: |
|
audio_array = librosa.resample(audio_array, orig_sr=sr, target_sr=sample_rate) |
|
elif not LIBROSA_AVAILABLE: |
|
|
|
if sr != sample_rate: |
|
|
|
step = sr // sample_rate |
|
audio_array = audio_array[::step] if step > 1 else audio_array |
|
else: |
|
audio_array = None |
|
|
|
|
|
emotion_record = emotion_system.process_frame(video_frame, audio_array, sample_rate) |
|
|
|
|
|
annotated_frame = create_emotion_overlay(video_frame, emotion_record) |
|
|
|
|
|
clinical_text = format_clinical_emotions(emotion_record['clinical_emotions']) |
|
alerts_text = "\n".join(emotion_record['alerts']) if emotion_record['alerts'] else "No alerts" |
|
suggestions_text = "\n".join(emotion_record['suggestions']) if emotion_record['suggestions'] else "No suggestions" |
|
|
|
return annotated_frame, clinical_text, alerts_text, suggestions_text |
|
|
|
def create_emotion_overlay(frame, emotion_record): |
|
"""Add emotion information overlay to video frame""" |
|
try: |
|
if CV2_AVAILABLE: |
|
annotated_frame = frame.copy() |
|
|
|
|
|
clinical_emotions = emotion_record['clinical_emotions'] |
|
top_emotion = max(clinical_emotions.items(), key=lambda x: x[1]) |
|
|
|
|
|
cv2.putText(annotated_frame, f"Primary: {top_emotion[0]} ({top_emotion[1]:.2f})", |
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) |
|
|
|
|
|
if emotion_record['alerts']: |
|
cv2.putText(annotated_frame, "ALERT!", (10, 60), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) |
|
|
|
return annotated_frame |
|
|
|
elif PIL_AVAILABLE: |
|
|
|
pil_image = Image.fromarray(frame) |
|
draw = ImageDraw.Draw(pil_image) |
|
|
|
|
|
clinical_emotions = emotion_record['clinical_emotions'] |
|
top_emotion = max(clinical_emotions.items(), key=lambda x: x[1]) |
|
|
|
|
|
try: |
|
font = ImageFont.load_default() |
|
except: |
|
font = None |
|
|
|
text = f"Primary: {top_emotion[0]} ({top_emotion[1]:.2f})" |
|
draw.text((10, 10), text, fill=(0, 255, 0), font=font) |
|
|
|
|
|
if emotion_record['alerts']: |
|
draw.text((10, 40), "ALERT!", fill=(255, 0, 0), font=font) |
|
|
|
return np.array(pil_image) |
|
|
|
else: |
|
|
|
return frame |
|
|
|
except Exception as e: |
|
print(f"Error creating emotion overlay: {e}") |
|
return frame |
|
|
|
def format_clinical_emotions(clinical_emotions): |
|
"""Format clinical emotions for display""" |
|
formatted = [] |
|
for emotion, score in clinical_emotions.items(): |
|
bar = "β" * int(score * 10) |
|
formatted.append(f"{emotion.capitalize()}: {bar} {score:.3f}") |
|
return "\n".join(formatted) |
|
|
|
def create_emotion_timeline(): |
|
"""Create emotion timeline chart""" |
|
if not emotion_system.emotion_history: |
|
return create_empty_chart() |
|
|
|
|
|
timestamps = [record['timestamp'] for record in emotion_system.emotion_history] |
|
|
|
fig = go.Figure() |
|
|
|
|
|
clinical_emotions = ['stress', 'anxiety', 'pain', 'confusion', 'comfort'] |
|
colors = ['red', 'orange', 'purple', 'brown', 'green'] |
|
|
|
for emotion, color in zip(clinical_emotions, colors): |
|
values = [record['clinical_emotions'].get(emotion, 0) for record in emotion_system.emotion_history] |
|
fig.add_trace(go.Scatter( |
|
x=timestamps, |
|
y=values, |
|
mode='lines+markers', |
|
name=emotion.capitalize(), |
|
line=dict(color=color, width=2), |
|
marker=dict(size=4) |
|
)) |
|
|
|
fig.update_layout( |
|
title="Patient Emotion Timeline", |
|
xaxis_title="Time", |
|
yaxis_title="Emotion Intensity", |
|
height=400, |
|
showlegend=True, |
|
template="plotly_white" |
|
) |
|
|
|
return fig |
|
|
|
def create_empty_chart(): |
|
"""Create empty chart when no data available""" |
|
fig = go.Figure() |
|
fig.add_annotation( |
|
text="No emotion data available yet", |
|
xref="paper", yref="paper", |
|
x=0.5, y=0.5, xanchor='center', yanchor='middle', |
|
showarrow=False, font=dict(size=16) |
|
) |
|
fig.update_layout( |
|
title="Patient Emotion Timeline", |
|
height=400, |
|
template="plotly_white" |
|
) |
|
return fig |
|
|
|
def get_session_summary(): |
|
"""Generate session summary""" |
|
if not emotion_system.emotion_history: |
|
return "No session data available" |
|
|
|
|
|
avg_emotions = {} |
|
total_alerts = 0 |
|
|
|
for emotion in ['stress', 'anxiety', 'pain', 'confusion', 'comfort']: |
|
values = [record['clinical_emotions'].get(emotion, 0) for record in emotion_system.emotion_history] |
|
avg_emotions[emotion] = np.mean(values) if values else 0 |
|
|
|
total_alerts = sum(len(record['alerts']) for record in emotion_system.emotion_history) |
|
|
|
|
|
summary = f""" |
|
Session Summary: |
|
- Duration: {len(emotion_system.emotion_history)} readings |
|
- Average Stress Level: {avg_emotions['stress']:.3f} |
|
- Average Anxiety Level: {avg_emotions['anxiety']:.3f} |
|
- Average Pain Level: {avg_emotions['pain']:.3f} |
|
- Average Confusion Level: {avg_emotions['confusion']:.3f} |
|
- Average Comfort Level: {avg_emotions['comfort']:.3f} |
|
- Total Alerts: {total_alerts} |
|
|
|
Recommendations: |
|
- Monitor stress levels during consultation |
|
- Ensure patient understanding and comfort |
|
- Address any recurring high emotion levels |
|
""" |
|
|
|
return summary |
|
|
|
def clear_session(): |
|
"""Clear session data""" |
|
emotion_system.emotion_history.clear() |
|
return "Session data cleared", create_empty_chart(), "" |
|
|
|
|
|
def create_interface(): |
|
with gr.Blocks(title="Patient Emotion Recognition System", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown(""" |
|
# π₯ Real-Time Patient Emotion Recognition System |
|
|
|
This system analyzes patient facial expressions and voice tone during consultations to detect emotions such as stress, anxiety, confusion, or pain. |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
gr.Markdown("### πΉ Live Analysis") |
|
|
|
|
|
video_input = gr.Video( |
|
label="Video Feed", |
|
sources=["webcam"], |
|
streaming=True |
|
) |
|
|
|
|
|
audio_input = gr.Audio( |
|
label="Audio Input", |
|
sources=["microphone"], |
|
type="numpy", |
|
streaming=True |
|
) |
|
|
|
|
|
process_btn = gr.Button("π Process Current Frame", variant="primary") |
|
|
|
with gr.Column(scale=2): |
|
gr.Markdown("### π Real-Time Results") |
|
|
|
|
|
video_output = gr.Image( |
|
label="Emotion Analysis", |
|
type="numpy" |
|
) |
|
|
|
|
|
clinical_output = gr.Textbox( |
|
label="Clinical Emotion Levels", |
|
lines=6, |
|
interactive=False |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("### β οΈ Alerts") |
|
alerts_output = gr.Textbox( |
|
label="Current Alerts", |
|
lines=3, |
|
interactive=False |
|
) |
|
|
|
with gr.Column(): |
|
gr.Markdown("### π‘ Suggestions") |
|
suggestions_output = gr.Textbox( |
|
label="Practitioner Suggestions", |
|
lines=3, |
|
interactive=False |
|
) |
|
|
|
with gr.Row(): |
|
gr.Markdown("### π Emotion Timeline") |
|
timeline_plot = gr.Plot(label="Emotion Timeline") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("### π Session Summary") |
|
summary_output = gr.Textbox( |
|
label="Session Summary", |
|
lines=12, |
|
interactive=False |
|
) |
|
|
|
with gr.Row(): |
|
update_summary_btn = gr.Button("π Update Summary") |
|
clear_btn = gr.Button("ποΈ Clear Session", variant="secondary") |
|
update_timeline_btn = gr.Button("π Update Timeline") |
|
|
|
|
|
process_btn.click( |
|
fn=process_video_audio, |
|
inputs=[video_input, audio_input], |
|
outputs=[video_output, clinical_output, alerts_output, suggestions_output] |
|
) |
|
|
|
update_timeline_btn.click( |
|
fn=create_emotion_timeline, |
|
outputs=timeline_plot |
|
) |
|
|
|
update_summary_btn.click( |
|
fn=get_session_summary, |
|
outputs=summary_output |
|
) |
|
|
|
clear_btn.click( |
|
fn=clear_session, |
|
outputs=[summary_output, timeline_plot, clinical_output] |
|
) |
|
|
|
|
|
demo.load(fn=create_emotion_timeline, outputs=timeline_plot) |
|
|
|
gr.Markdown(""" |
|
### π Usage Instructions: |
|
1. **Enable camera and microphone** access when prompted |
|
2. **Click "Process Current Frame"** to analyze emotions in real-time |
|
3. **Monitor the timeline** to track emotion changes over time |
|
4. **Review alerts and suggestions** for patient care recommendations |
|
5. **Use session summary** for consultation documentation |
|
|
|
### π§ Technical Notes: |
|
- System uses pre-trained emotion recognition models |
|
- Combines facial expression and voice tone analysis |
|
- Provides clinical emotion mapping (stress, anxiety, pain, confusion) |
|
- Generates real-time alerts and suggestions for practitioners |
|
""") |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
demo.launch( |
|
share=True, |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
show_error=True |
|
) |