import gradio as gr import numpy as np import pandas as pd import plotly.graph_objects as go import plotly.express as px from datetime import datetime, timedelta import threading import queue import time from collections import deque import warnings warnings.filterwarnings("ignore") # Try to import OpenCV with fallback try: import cv2 CV2_AVAILABLE = True except ImportError: CV2_AVAILABLE = False print("OpenCV not available - using PIL for image processing") # Try to import librosa with fallback try: import librosa LIBROSA_AVAILABLE = True except ImportError: LIBROSA_AVAILABLE = False print("Librosa not available - using basic audio processing") # Try to import transformers and torch, with fallbacks try: from transformers import pipeline import torch HF_AVAILABLE = True except ImportError: HF_AVAILABLE = False print("Transformers not available - using mock emotion detection") # Additional imports for image processing if OpenCV fails try: from PIL import Image, ImageDraw, ImageFont PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False class EmotionRecognitionSystem: def __init__(self): self.emotion_history = deque(maxlen=100) # Store last 100 emotion readings self.audio_queue = queue.Queue() self.video_queue = queue.Queue() # Initialize emotion detection models self.setup_models() # Emotion thresholds for alerts self.alert_thresholds = { 'stress': 0.7, 'anxiety': 0.6, 'pain': 0.8, 'confusion': 0.5 } def setup_models(self): """Initialize emotion recognition models""" if HF_AVAILABLE: try: # Facial emotion recognition self.face_emotion_pipeline = pipeline( "image-classification", model="j-hartmann/emotion-english-distilroberta-base", device=0 if torch.cuda.is_available() else -1 ) # Audio emotion recognition self.audio_emotion_pipeline = pipeline( "audio-classification", model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition", device=0 if torch.cuda.is_available() else -1 ) self.models_loaded = True except Exception as e: print(f"Error loading models: {e}") self.models_loaded = False else: self.models_loaded = False def detect_face_emotion(self, frame): """Detect emotions from facial expressions""" if not self.models_loaded: # Mock emotion detection for demo emotions = ['neutral', 'happy', 'sad', 'angry', 'fear', 'surprise', 'disgust'] scores = np.random.dirichlet(np.ones(len(emotions))) return dict(zip(emotions, scores)) try: # Handle different image formats if isinstance(frame, np.ndarray): if CV2_AVAILABLE: # Convert frame to RGB if it's BGR if len(frame.shape) == 3 and frame.shape[2] == 3: rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) else: rgb_frame = frame else: # Use numpy operations for color conversion if len(frame.shape) == 3 and frame.shape[2] == 3: rgb_frame = frame[:, :, ::-1] # BGR to RGB else: rgb_frame = frame else: rgb_frame = frame # Use face emotion model results = self.face_emotion_pipeline(rgb_frame) # Convert to standardized format emotion_scores = {} for result in results: emotion_scores[result['label'].lower()] = result['score'] return emotion_scores except Exception as e: print(f"Face emotion detection error: {e}") return {'neutral': 1.0} def detect_voice_emotion(self, audio_data, sample_rate=16000): """Detect emotions from voice tone""" if not self.models_loaded or audio_data is None: # Mock emotion detection emotions = ['neutral', 'happy', 'sad', 'angry', 'fear'] scores = np.random.dirichlet(np.ones(len(emotions))) return dict(zip(emotions, scores)) try: # Process audio with the model results = self.audio_emotion_pipeline(audio_data) emotion_scores = {} for result in results: emotion_scores[result['label'].lower()] = result['score'] return emotion_scores except Exception as e: print(f"Voice emotion detection error: {e}") return {'neutral': 1.0} def extract_audio_features(self, audio_data, sample_rate): """Extract audio features for emotion analysis""" if not LIBROSA_AVAILABLE: # Return mock features if librosa is not available return { 'mfcc_mean': np.random.random(), 'mfcc_std': np.random.random(), 'spectral_centroid_mean': np.random.random(), 'zcr_mean': np.random.random(), 'spectral_rolloff_mean': np.random.random() } try: # Extract basic audio features mfccs = librosa.feature.mfcc(y=audio_data, sr=sample_rate, n_mfcc=13) spectral_centroids = librosa.feature.spectral_centroid(y=audio_data, sr=sample_rate) zero_crossing_rate = librosa.feature.zero_crossing_rate(audio_data) spectral_rolloff = librosa.feature.spectral_rolloff(y=audio_data, sr=sample_rate) features = { 'mfcc_mean': np.mean(mfccs), 'mfcc_std': np.std(mfccs), 'spectral_centroid_mean': np.mean(spectral_centroids), 'zcr_mean': np.mean(zero_crossing_rate), 'spectral_rolloff_mean': np.mean(spectral_rolloff) } return features except Exception as e: print(f"Audio feature extraction error: {e}") return {} def combine_emotions(self, face_emotions, voice_emotions, weights=(0.6, 0.4)): """Combine facial and voice emotion predictions""" combined = {} all_emotions = set(face_emotions.keys()) | set(voice_emotions.keys()) for emotion in all_emotions: face_score = face_emotions.get(emotion, 0) voice_score = voice_emotions.get(emotion, 0) combined[emotion] = weights[0] * face_score + weights[1] * voice_score return combined def map_to_clinical_emotions(self, emotions): """Map detected emotions to clinical categories""" clinical_mapping = { 'stress': emotions.get('angry', 0) * 0.3 + emotions.get('fear', 0) * 0.4 + emotions.get('disgust', 0) * 0.3, 'anxiety': emotions.get('fear', 0) * 0.6 + emotions.get('surprise', 0) * 0.2 + emotions.get('sad', 0) * 0.2, 'pain': emotions.get('angry', 0) * 0.4 + emotions.get('disgust', 0) * 0.3 + emotions.get('sad', 0) * 0.3, 'confusion': emotions.get('surprise', 0) * 0.5 + emotions.get('neutral', 0) * 0.3 + emotions.get('fear', 0) * 0.2, 'comfort': emotions.get('happy', 0) * 0.7 + emotions.get('neutral', 0) * 0.3 } return clinical_mapping def generate_alerts(self, clinical_emotions): """Generate alerts based on emotion thresholds""" alerts = [] suggestions = [] for emotion, score in clinical_emotions.items(): if emotion in self.alert_thresholds and score > self.alert_thresholds[emotion]: alerts.append(f"⚠️ High {emotion} detected ({score:.2f})") # Add specific suggestions if emotion == 'stress': suggestions.append("Consider: Take a moment to slow down, use calming voice tone") elif emotion == 'anxiety': suggestions.append("Consider: Provide reassurance, explain procedures clearly") elif emotion == 'pain': suggestions.append("Consider: Assess pain level, offer comfort measures") elif emotion == 'confusion': suggestions.append("Consider: Simplify explanations, check understanding") return alerts, suggestions def process_frame(self, frame, audio_data=None, sample_rate=16000): """Process a single frame and audio data""" timestamp = datetime.now() # Detect emotions face_emotions = self.detect_face_emotion(frame) voice_emotions = self.detect_voice_emotion(audio_data, sample_rate) if audio_data is not None else {} # Combine emotions if voice_emotions: combined_emotions = self.combine_emotions(face_emotions, voice_emotions) else: combined_emotions = face_emotions # Map to clinical categories clinical_emotions = self.map_to_clinical_emotions(combined_emotions) # Generate alerts alerts, suggestions = self.generate_alerts(clinical_emotions) # Store in history emotion_record = { 'timestamp': timestamp, 'face_emotions': face_emotions, 'voice_emotions': voice_emotions, 'clinical_emotions': clinical_emotions, 'alerts': alerts, 'suggestions': suggestions } self.emotion_history.append(emotion_record) return emotion_record # Initialize the emotion recognition system emotion_system = EmotionRecognitionSystem() def process_video_audio(video_frame, audio_data): """Process video frame and audio data""" if video_frame is None: return None, "No video input", "", "" # Process the frame sample_rate = 16000 if audio_data is not None: audio_array, sr = audio_data if LIBROSA_AVAILABLE and sr != sample_rate: audio_array = librosa.resample(audio_array, orig_sr=sr, target_sr=sample_rate) elif not LIBROSA_AVAILABLE: # Simple resampling if librosa not available if sr != sample_rate: # Basic downsampling step = sr // sample_rate audio_array = audio_array[::step] if step > 1 else audio_array else: audio_array = None # Get emotion analysis emotion_record = emotion_system.process_frame(video_frame, audio_array, sample_rate) # Create visualization annotated_frame = create_emotion_overlay(video_frame, emotion_record) # Format results clinical_text = format_clinical_emotions(emotion_record['clinical_emotions']) alerts_text = "\n".join(emotion_record['alerts']) if emotion_record['alerts'] else "No alerts" suggestions_text = "\n".join(emotion_record['suggestions']) if emotion_record['suggestions'] else "No suggestions" return annotated_frame, clinical_text, alerts_text, suggestions_text def create_emotion_overlay(frame, emotion_record): """Add emotion information overlay to video frame""" try: if CV2_AVAILABLE: annotated_frame = frame.copy() # Get top emotion clinical_emotions = emotion_record['clinical_emotions'] top_emotion = max(clinical_emotions.items(), key=lambda x: x[1]) # Add text overlay cv2.putText(annotated_frame, f"Primary: {top_emotion[0]} ({top_emotion[1]:.2f})", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) # Add alert indicator if emotion_record['alerts']: cv2.putText(annotated_frame, "ALERT!", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) return annotated_frame elif PIL_AVAILABLE: # Use PIL for image annotation pil_image = Image.fromarray(frame) draw = ImageDraw.Draw(pil_image) # Get top emotion clinical_emotions = emotion_record['clinical_emotions'] top_emotion = max(clinical_emotions.items(), key=lambda x: x[1]) # Add text overlay try: font = ImageFont.load_default() except: font = None text = f"Primary: {top_emotion[0]} ({top_emotion[1]:.2f})" draw.text((10, 10), text, fill=(0, 255, 0), font=font) # Add alert indicator if emotion_record['alerts']: draw.text((10, 40), "ALERT!", fill=(255, 0, 0), font=font) return np.array(pil_image) else: # Return original frame if no image processing available return frame except Exception as e: print(f"Error creating emotion overlay: {e}") return frame def format_clinical_emotions(clinical_emotions): """Format clinical emotions for display""" formatted = [] for emotion, score in clinical_emotions.items(): bar = "█" * int(score * 10) formatted.append(f"{emotion.capitalize()}: {bar} {score:.3f}") return "\n".join(formatted) def create_emotion_timeline(): """Create emotion timeline chart""" if not emotion_system.emotion_history: return create_empty_chart() # Extract data for plotting timestamps = [record['timestamp'] for record in emotion_system.emotion_history] fig = go.Figure() # Add traces for each clinical emotion clinical_emotions = ['stress', 'anxiety', 'pain', 'confusion', 'comfort'] colors = ['red', 'orange', 'purple', 'brown', 'green'] for emotion, color in zip(clinical_emotions, colors): values = [record['clinical_emotions'].get(emotion, 0) for record in emotion_system.emotion_history] fig.add_trace(go.Scatter( x=timestamps, y=values, mode='lines+markers', name=emotion.capitalize(), line=dict(color=color, width=2), marker=dict(size=4) )) fig.update_layout( title="Patient Emotion Timeline", xaxis_title="Time", yaxis_title="Emotion Intensity", height=400, showlegend=True, template="plotly_white" ) return fig def create_empty_chart(): """Create empty chart when no data available""" fig = go.Figure() fig.add_annotation( text="No emotion data available yet", xref="paper", yref="paper", x=0.5, y=0.5, xanchor='center', yanchor='middle', showarrow=False, font=dict(size=16) ) fig.update_layout( title="Patient Emotion Timeline", height=400, template="plotly_white" ) return fig def get_session_summary(): """Generate session summary""" if not emotion_system.emotion_history: return "No session data available" # Calculate averages avg_emotions = {} total_alerts = 0 for emotion in ['stress', 'anxiety', 'pain', 'confusion', 'comfort']: values = [record['clinical_emotions'].get(emotion, 0) for record in emotion_system.emotion_history] avg_emotions[emotion] = np.mean(values) if values else 0 total_alerts = sum(len(record['alerts']) for record in emotion_system.emotion_history) # Format summary summary = f""" Session Summary: - Duration: {len(emotion_system.emotion_history)} readings - Average Stress Level: {avg_emotions['stress']:.3f} - Average Anxiety Level: {avg_emotions['anxiety']:.3f} - Average Pain Level: {avg_emotions['pain']:.3f} - Average Confusion Level: {avg_emotions['confusion']:.3f} - Average Comfort Level: {avg_emotions['comfort']:.3f} - Total Alerts: {total_alerts} Recommendations: - Monitor stress levels during consultation - Ensure patient understanding and comfort - Address any recurring high emotion levels """ return summary def clear_session(): """Clear session data""" emotion_system.emotion_history.clear() return "Session data cleared", create_empty_chart(), "" # Create Gradio interface def create_interface(): with gr.Blocks(title="Patient Emotion Recognition System", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🏥 Real-Time Patient Emotion Recognition System This system analyzes patient facial expressions and voice tone during consultations to detect emotions such as stress, anxiety, confusion, or pain. """) with gr.Row(): with gr.Column(scale=2): gr.Markdown("### 📹 Live Analysis") # Video input video_input = gr.Video( label="Video Feed", sources=["webcam"], streaming=True ) # Audio input audio_input = gr.Audio( label="Audio Input", sources=["microphone"], type="numpy", streaming=True ) # Process button process_btn = gr.Button("🔄 Process Current Frame", variant="primary") with gr.Column(scale=2): gr.Markdown("### 📊 Real-Time Results") # Annotated video output video_output = gr.Image( label="Emotion Analysis", type="numpy" ) # Clinical emotions display clinical_output = gr.Textbox( label="Clinical Emotion Levels", lines=6, interactive=False ) with gr.Row(): with gr.Column(): gr.Markdown("### ⚠️ Alerts") alerts_output = gr.Textbox( label="Current Alerts", lines=3, interactive=False ) with gr.Column(): gr.Markdown("### 💡 Suggestions") suggestions_output = gr.Textbox( label="Practitioner Suggestions", lines=3, interactive=False ) with gr.Row(): gr.Markdown("### 📈 Emotion Timeline") timeline_plot = gr.Plot(label="Emotion Timeline") with gr.Row(): with gr.Column(): gr.Markdown("### 📋 Session Summary") summary_output = gr.Textbox( label="Session Summary", lines=12, interactive=False ) with gr.Row(): update_summary_btn = gr.Button("📊 Update Summary") clear_btn = gr.Button("🗑️ Clear Session", variant="secondary") update_timeline_btn = gr.Button("🔄 Update Timeline") # Event handlers process_btn.click( fn=process_video_audio, inputs=[video_input, audio_input], outputs=[video_output, clinical_output, alerts_output, suggestions_output] ) update_timeline_btn.click( fn=create_emotion_timeline, outputs=timeline_plot ) update_summary_btn.click( fn=get_session_summary, outputs=summary_output ) clear_btn.click( fn=clear_session, outputs=[summary_output, timeline_plot, clinical_output] ) # Auto-update timeline every few seconds demo.load(fn=create_emotion_timeline, outputs=timeline_plot) gr.Markdown(""" ### 📝 Usage Instructions: 1. **Enable camera and microphone** access when prompted 2. **Click "Process Current Frame"** to analyze emotions in real-time 3. **Monitor the timeline** to track emotion changes over time 4. **Review alerts and suggestions** for patient care recommendations 5. **Use session summary** for consultation documentation ### 🔧 Technical Notes: - System uses pre-trained emotion recognition models - Combines facial expression and voice tone analysis - Provides clinical emotion mapping (stress, anxiety, pain, confusion) - Generates real-time alerts and suggestions for practitioners """) return demo # Launch the application if __name__ == "__main__": demo = create_interface() demo.launch( share=True, server_name="0.0.0.0", server_port=7860, show_error=True )