import gradio as gr import numpy as np import cv2 import pandas as pd from datetime import datetime import time import librosa from python_speech_features import mfcc import onnxruntime as ort import requests import os from sklearn.preprocessing import StandardScaler # Constants MODEL_URL = "https://github.com/onnx/models/raw/main/vision/body_analysis/emotion_ferplus/model/emotion-ferplus-8.onnx" MODEL_PATH = "emotion-ferplus-8.onnx" MODEL_CHECKSUM_SIZE = 2483870 # Expected file size in bytes for verification class EmotionModel: def __init__(self): self.session = None self.labels = ['neutral', 'happy', 'surprise', 'sad', 'angry', 'disgust', 'fear', 'contempt'] self.load_model() def download_model(self): try: print("Downloading emotion recognition model...") response = requests.get(MODEL_URL, stream=True, timeout=30) response.raise_for_status() with open(MODEL_PATH, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) # Verify download if os.path.exists(MODEL_PATH): actual_size = os.path.getsize(MODEL_PATH) if actual_size != MODEL_CHECKSUM_SIZE: print(f"Warning: Downloaded file size {actual_size} doesn't match expected size {MODEL_CHECKSUM_SIZE}") return True return False except Exception as e: print(f"Download failed: {str(e)}") return False def load_model(self): if not os.path.exists(MODEL_PATH): if not self.download_model(): print("Using dummy emotion model") self.session = DummyEmotionSession() return try: so = ort.SessionOptions() so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL self.session = ort.InferenceSession(MODEL_PATH, so) print("Emotion model loaded successfully") except Exception as e: print(f"Failed to load ONNX model: {str(e)}") print("Using dummy emotion model") self.session = DummyEmotionSession() def predict(self, frame): return self.session.run(None, {'Input3': frame})[0] class DummyEmotionSession: def run(self, *args, **kwargs): # Return mostly neutral with slight random variations base = np.array([0.8] + [0.1]*7) variation = np.random.normal(0, 0.01, size=8) return [np.clip(base + variation, 0, 1).reshape(1, -1)] class VoiceEmotionClassifier: def __init__(self): self.scaler = StandardScaler() # Initialize with dummy data for scaling dummy_features = np.random.randn(100, 13) self.scaler.fit(dummy_features) def extract_features(self, audio): try: sr, y = audio y = y.astype(np.float32) if len(y.shape) > 1: # Convert stereo to mono y = np.mean(y, axis=0) if sr != 16000: # Resample if needed y = librosa.resample(y, orig_sr=sr, target_sr=16000) sr = 16000 mfcc_features = mfcc(y, sr, numcep=13) return np.mean(mfcc_features, axis=0) except Exception as e: print(f"Feature extraction error: {str(e)}") return np.zeros(13) def predict(self, audio): try: features = self.extract_features(audio).reshape(1, -1) features = self.scaler.transform(features) # Simple rule-based classifier (replace with actual trained model) if features[0, 0] > 1.0: return "happy", [{"label": "happy", "score": 0.8}] elif features[0, 0] < -1.0: return "sad", [{"label": "sad", "score": 0.7}] elif abs(features[0, 1]) > 0.8: return "angry", [{"label": "angry", "score": 0.6}] else: return "neutral", [{"label": "neutral", "score": 0.9}] except Exception as e: print(f"Voice prediction error: {str(e)}") return "neutral", [{"label": "neutral", "score": 1.0}] # Initialize models emotion_model = EmotionModel() voice_classifier = VoiceEmotionClassifier() # Global variables to store results emotion_history = [] current_emotions = {"face": "neutral", "voice": "neutral"} last_update_time = time.time() def analyze_face(frame): """Analyze facial expressions in the frame using ONNX model""" try: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') faces = face_cascade.detectMultiScale(gray, 1.3, 5) if len(faces) > 0: x, y, w, h = faces[0] face_roi = gray[y:y+h, x:x+w] face_roi = cv2.resize(face_roi, (64, 64)) face_roi = face_roi.astype('float32') / 255.0 face_roi = np.expand_dims(face_roi, axis=(0, 1)) results = emotion_model.predict(face_roi) emotion_probs = results[0] dominant_emotion = emotion_model.labels[np.argmax(emotion_probs)] emotions = {label: float(prob) for label, prob in zip(emotion_model.labels, emotion_probs)} return dominant_emotion, emotions return "neutral", {label: 0.0 for label in emotion_model.labels} except Exception as e: print(f"Face analysis error: {str(e)}") return "neutral", {label: 0.0 for label in emotion_model.labels} def analyze_voice(audio): """Analyze voice tone from audio""" return voice_classifier.predict(audio) def update_emotion_history(face_emotion, voice_emotion): """Update the emotion history and current emotions""" global current_emotions, emotion_history, last_update_time current_time = datetime.now().strftime("%H:%M:%S") current_emotions = { "face": face_emotion, "voice": voice_emotion, "timestamp": current_time } if (time.time() - last_update_time) > 5 or not emotion_history: emotion_history.append(current_emotions.copy()) last_update_time = time.time() if len(emotion_history) > 20: emotion_history = emotion_history[-20:] def get_emotion_timeline(): """Create a timeline DataFrame for display""" if not emotion_history: return pd.DataFrame(columns=["Time", "Facial Emotion", "Voice Emotion"]) df = pd.DataFrame(emotion_history) df = df.rename(columns={ "timestamp": "Time", "face": "Facial Emotion", "voice": "Voice Emotion" }) return df def get_practitioner_advice(face_emotion, voice_emotion): """Generate suggestions based on detected emotions""" advice = [] # Facial emotion advice if face_emotion in ["sad", "fear"]: advice.append("Patient appears distressed. Consider speaking more slowly and with reassurance.") elif face_emotion == "angry": advice.append("Patient seems frustrated. Acknowledge their concerns and maintain calm demeanor.") elif face_emotion == "disgust": advice.append("Patient may be uncomfortable. Check if they're experiencing any discomfort.") elif face_emotion == "surprise": advice.append("Patient seems surprised. Ensure they understand all information.") # Voice emotion advice if voice_emotion in ["sad", "fear"]: advice.append("Patient's tone suggests anxiety. Provide clear explanations and emotional support.") elif voice_emotion == "angry": advice.append("Patient sounds upset. Practice active listening and validate their feelings.") elif voice_emotion == "happy": advice.append("Patient seems positive. This may be a good time to discuss treatment options.") return "\n".join(advice) if advice else "Patient appears neutral. Continue with consultation." def process_input(video, audio): """Process video and audio inputs to detect emotions""" try: # Process video frame if video is not None: frame = cv2.cvtColor(video, cv2.COLOR_RGB2BGR) face_emotion, face_details = analyze_face(frame) else: face_emotion, face_details = "neutral", {} # Process audio if audio is not None: voice_emotion, voice_details = analyze_voice(audio) else: voice_emotion, voice_details = "neutral", {} update_emotion_history(face_emotion, voice_emotion) timeline_df = get_emotion_timeline() advice = get_practitioner_advice(face_emotion, voice_emotion) return { "current_face": face_emotion, "current_voice": voice_emotion, "timeline": timeline_df, "advice": advice, "face_details": str(face_details), "voice_details": str(voice_details) } except Exception as e: print(f"Processing error: {str(e)}") return { "current_face": "Error", "current_voice": "Error", "timeline": pd.DataFrame(), "advice": "System error occurred", "face_details": "", "voice_details": "" } # Gradio interface with gr.Blocks(title="Patient Emotion Recognition", theme="soft") as demo: gr.Markdown("# Real-Time Patient Emotion Recognition") gr.Markdown("Analyze facial expressions and voice tone during medical consultations") with gr.Row(): with gr.Column(): # Updated Image component without 'source' parameter video_input = gr.Image(label="Live Camera Feed", streaming=True) audio_input = gr.Audio(label="Voice Input", sources=["microphone"], type="numpy") submit_btn = gr.Button("Analyze Emotions") with gr.Column(): current_face = gr.Textbox(label="Current Facial Emotion") current_voice = gr.Textbox(label="Current Voice Emotion") advice_output = gr.Textbox(label="Practitioner Suggestions", lines=3) timeline_output = gr.Dataframe(label="Emotion Timeline", interactive=False) face_details = gr.Textbox(label="Face Analysis Details", visible=False) voice_details = gr.Textbox(label="Voice Analysis Details", visible=False) # Live processing video_input.change( process_input, inputs=[video_input, audio_input], outputs=[current_face, current_voice, timeline_output, advice_output, face_details, voice_details], show_progress="hidden" ) audio_input.change( process_input, inputs=[video_input, audio_input], outputs=[current_face, current_voice, timeline_output, advice_output, face_details, voice_details], show_progress="hidden" ) submit_btn.click( process_input, inputs=[video_input, audio_input], outputs=[current_face, current_voice, timeline_output, advice_output, face_details, voice_details] ) if __name__ == "__main__": demo.launch(debug=True, server_name="0.0.0.0", server_port=7860)