|
import gradio as gr |
|
import numpy as np |
|
import cv2 |
|
import pandas as pd |
|
from datetime import datetime |
|
import time |
|
import librosa |
|
import joblib |
|
from python_speech_features import mfcc |
|
import onnxruntime as ort |
|
import requests |
|
import os |
|
from sklearn.preprocessing import StandardScaler |
|
|
|
|
|
MODEL_URL = "https://github.com/onnx/models/raw/main/vision/body_analysis/emotion_ferplus/model/emotion-ferplus-8.onnx" |
|
MODEL_PATH = "emotion-ferplus-8.onnx" |
|
MODEL_CHECKSUM_SIZE = 2483870 |
|
VOICE_MODEL_PATH = "voice_emotion_model.pkl" |
|
VOICE_SCALER_PATH = "voice_scaler.pkl" |
|
|
|
class EmotionModel: |
|
def __init__(self): |
|
self.session = None |
|
self.labels = ['neutral', 'happy', 'surprise', 'sad', 'angry', 'disgust', 'fear', 'contempt'] |
|
self.emotion_buffer = [] |
|
self.load_model() |
|
|
|
def download_model(self): |
|
try: |
|
print("Downloading emotion recognition model...") |
|
response = requests.get(MODEL_URL, stream=True, timeout=30) |
|
response.raise_for_status() |
|
|
|
with open(MODEL_PATH, "wb") as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
if chunk: |
|
f.write(chunk) |
|
|
|
|
|
if os.path.exists(MODEL_PATH): |
|
actual_size = os.path.getsize(MODEL_PATH) |
|
if actual_size != MODEL_CHECKSUM_SIZE: |
|
print(f"Warning: Downloaded file size {actual_size} doesn't match expected size {MODEL_CHECKSUM_SIZE}") |
|
return True |
|
return False |
|
except Exception as e: |
|
print(f"Download failed: {str(e)}") |
|
return False |
|
|
|
def load_model(self): |
|
if not os.path.exists(MODEL_PATH): |
|
if not self.download_model(): |
|
raise RuntimeError("Failed to download emotion model") |
|
|
|
try: |
|
so = ort.SessionOptions() |
|
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
|
self.session = ort.InferenceSession(MODEL_PATH, so) |
|
|
|
|
|
dummy_input = np.random.rand(1, 1, 64, 64).astype(np.float32) |
|
self.session.run(None, {'Input3': dummy_input}) |
|
print("Emotion model loaded and verified") |
|
except Exception as e: |
|
raise RuntimeError(f"Failed to load/verify ONNX model: {str(e)}") |
|
|
|
def softmax(self, x): |
|
e_x = np.exp(x - np.max(x)) |
|
return e_x / e_x.sum() |
|
|
|
def predict(self, frame): |
|
|
|
raw_prediction = self.session.run(None, {'Input3': frame})[0][0] |
|
self.emotion_buffer.append(raw_prediction) |
|
|
|
|
|
if len(self.emotion_buffer) > 5: |
|
self.emotion_buffer = self.emotion_buffer[-5:] |
|
|
|
|
|
smoothed_probs = np.mean(self.emotion_buffer, axis=0) |
|
return self.softmax(smoothed_probs).reshape(1, -1) |
|
|
|
class VoiceEmotionClassifier: |
|
def __init__(self): |
|
try: |
|
|
|
if os.path.exists(VOICE_MODEL_PATH) and os.path.exists(VOICE_SCALER_PATH): |
|
self.model = joblib.load(VOICE_MODEL_PATH) |
|
self.scaler = joblib.load(VOICE_SCALER_PATH) |
|
self.labels = ['neutral', 'happy', 'sad', 'angry', 'fear'] |
|
print("Loaded pretrained voice emotion model") |
|
else: |
|
raise FileNotFoundError("Pretrained voice model not found") |
|
except Exception as e: |
|
print(f"Voice model loading failed: {str(e)}") |
|
print("Using limited rule-based voice analysis") |
|
self.model = None |
|
self.scaler = StandardScaler() |
|
|
|
dummy_features = np.random.randn(100, 18) |
|
self.scaler.fit(dummy_features) |
|
self.labels = ['neutral', 'happy', 'sad', 'angry', 'fear'] |
|
|
|
def extract_features(self, audio): |
|
try: |
|
y, sr = audio |
|
features = [] |
|
|
|
if len(y.shape) > 1: |
|
y = np.mean(y, axis=0) |
|
|
|
if sr != 16000: |
|
y = librosa.resample(y, orig_sr=sr, target_sr=16000) |
|
sr = 16000 |
|
|
|
|
|
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) |
|
features.extend(np.mean(mfccs, axis=1)) |
|
features.extend(np.std(mfccs, axis=1)) |
|
|
|
|
|
pitches = librosa.yin(y, fmin=80, fmax=400) |
|
features.append(np.nanmean(pitches)) |
|
features.append(np.nanstd(pitches)) |
|
|
|
|
|
spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr) |
|
features.append(np.mean(spectral_centroid)) |
|
|
|
return np.array(features) |
|
except Exception as e: |
|
print(f"Feature extraction error: {str(e)}") |
|
return np.zeros(18) if self.model else np.zeros(13) |
|
|
|
def predict(self, audio): |
|
try: |
|
features = self.extract_features(audio).reshape(1, -1) |
|
features = self.scaler.transform(features) |
|
|
|
if self.model: |
|
probs = self.model.predict_proba(features)[0] |
|
emotion = self.labels[np.argmax(probs)] |
|
details = [{"label": l, "score": p} for l, p in zip(self.labels, probs)] |
|
else: |
|
|
|
if features[0, 0] > 1.0: |
|
emotion = "happy" |
|
details = [{"label": "happy", "score": 0.8}] |
|
elif features[0, 0] < -1.0: |
|
emotion = "sad" |
|
details = [{"label": "sad", "score": 0.7}] |
|
elif abs(features[0, 1]) > 0.8: |
|
emotion = "angry" |
|
details = [{"label": "angry", "score": 0.6}] |
|
else: |
|
emotion = "neutral" |
|
details = [{"label": "neutral", "score": 0.9}] |
|
|
|
return emotion, details |
|
except Exception as e: |
|
print(f"Voice prediction error: {str(e)}") |
|
return "neutral", [{"label": "neutral", "score": 1.0}] |
|
|
|
|
|
emotion_model = EmotionModel() |
|
voice_classifier = VoiceEmotionClassifier() |
|
|
|
|
|
emotion_history = [] |
|
current_emotions = {"face": "neutral", "voice": "neutral"} |
|
last_update_time = time.time() |
|
|
|
def analyze_face(frame): |
|
"""Analyze facial expressions in the frame using ONNX model""" |
|
try: |
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') |
|
faces = face_cascade.detectMultiScale(gray, 1.3, 5) |
|
|
|
if len(faces) > 0: |
|
x, y, w, h = faces[0] |
|
face_roi = gray[y:y+h, x:x+w] |
|
|
|
|
|
face_roi = cv2.resize(face_roi, (64, 64)) |
|
face_roi = face_roi.astype('float32') |
|
face_roi = (face_roi - 127.5) / 127.5 |
|
face_roi = np.expand_dims(face_roi, axis=(0, 1)) |
|
|
|
results = emotion_model.predict(face_roi) |
|
emotion_probs = results[0] |
|
|
|
|
|
if np.max(emotion_probs) < 0.5: |
|
return "uncertain", {label: 0.0 for label in emotion_model.labels} |
|
|
|
dominant_emotion = emotion_model.labels[np.argmax(emotion_probs)] |
|
emotions = {label: float(prob) for label, prob in zip(emotion_model.labels, emotion_probs)} |
|
return dominant_emotion, emotions |
|
|
|
return "neutral", {label: 0.0 for label in emotion_model.labels} |
|
except Exception as e: |
|
print(f"Face analysis error: {str(e)}") |
|
return "neutral", {label: 0.0 for label in emotion_model.labels} |
|
|
|
def analyze_voice(audio): |
|
"""Analyze voice tone from audio""" |
|
return voice_classifier.predict(audio) |
|
|
|
def update_emotion_history(face_emotion, voice_emotion): |
|
"""Update the emotion history and current emotions""" |
|
global current_emotions, emotion_history, last_update_time |
|
|
|
current_time = datetime.now().strftime("%H:%M:%S") |
|
current_emotions = { |
|
"face": face_emotion, |
|
"voice": voice_emotion, |
|
"timestamp": current_time |
|
} |
|
|
|
if (time.time() - last_update_time) > 5 or not emotion_history: |
|
emotion_history.append(current_emotions.copy()) |
|
last_update_time = time.time() |
|
|
|
if len(emotion_history) > 20: |
|
emotion_history = emotion_history[-20:] |
|
|
|
def get_emotion_timeline(): |
|
"""Create a timeline DataFrame for display""" |
|
if not emotion_history: |
|
return pd.DataFrame(columns=["Time", "Facial Emotion", "Voice Emotion"]) |
|
|
|
df = pd.DataFrame(emotion_history) |
|
df = df.rename(columns={ |
|
"timestamp": "Time", |
|
"face": "Facial Emotion", |
|
"voice": "Voice Emotion" |
|
}) |
|
return df |
|
|
|
def get_practitioner_advice(face_emotion, voice_emotion): |
|
"""Generate suggestions based on detected emotions""" |
|
advice = [] |
|
|
|
|
|
if face_emotion in ["sad", "fear"]: |
|
advice.append("Patient appears distressed. Consider speaking more slowly and with reassurance.") |
|
elif face_emotion == "angry": |
|
advice.append("Patient seems frustrated. Acknowledge their concerns and maintain calm demeanor.") |
|
elif face_emotion == "disgust": |
|
advice.append("Patient may be uncomfortable. Check if they're experiencing any discomfort.") |
|
elif face_emotion == "surprise": |
|
advice.append("Patient seems surprised. Ensure they understand all information.") |
|
elif face_emotion == "uncertain": |
|
advice.append("Facial expression unclear. Pay closer attention to verbal cues.") |
|
|
|
|
|
if voice_emotion in ["sad", "fear"]: |
|
advice.append("Patient's tone suggests anxiety. Provide clear explanations and emotional support.") |
|
elif voice_emotion == "angry": |
|
advice.append("Patient sounds upset. Practice active listening and validate their feelings.") |
|
elif voice_emotion == "happy": |
|
advice.append("Patient seems positive. This may be a good time to discuss treatment options.") |
|
|
|
return "\n".join(advice) if advice else "Patient appears neutral. Continue with consultation." |
|
|
|
def process_input(video, audio): |
|
"""Process video and audio inputs to detect emotions""" |
|
try: |
|
|
|
if video is not None: |
|
frame = cv2.cvtColor(video, cv2.COLOR_RGB2BGR) |
|
face_emotion, face_details = analyze_face(frame) |
|
else: |
|
face_emotion, face_details = "neutral", {} |
|
|
|
|
|
if audio is not None: |
|
voice_emotion, voice_details = analyze_voice(audio) |
|
else: |
|
voice_emotion, voice_details = "neutral", {} |
|
|
|
update_emotion_history(face_emotion, voice_emotion) |
|
timeline_df = get_emotion_timeline() |
|
advice = get_practitioner_advice(face_emotion, voice_emotion) |
|
|
|
return ( |
|
face_emotion, |
|
voice_emotion, |
|
timeline_df, |
|
advice, |
|
str(face_details), |
|
str(voice_details) |
|
) |
|
except Exception as e: |
|
print(f"Processing error: {str(e)}") |
|
return ( |
|
"Error", |
|
"Error", |
|
pd.DataFrame(), |
|
"System error occurred", |
|
"", |
|
"" |
|
) |
|
|
|
|
|
with gr.Blocks(title="Patient Emotion Recognition", theme="soft") as demo: |
|
gr.Markdown("# Real-Time Patient Emotion Recognition") |
|
gr.Markdown("Analyze facial expressions and voice tone during medical consultations") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
video_input = gr.Image(label="Live Camera Feed", streaming=True) |
|
audio_input = gr.Audio(label="Voice Input", sources=["microphone"], type="numpy") |
|
submit_btn = gr.Button("Analyze Emotions") |
|
|
|
with gr.Column(): |
|
current_face = gr.Textbox(label="Current Facial Emotion") |
|
current_voice = gr.Textbox(label="Current Voice Emotion") |
|
advice_output = gr.Textbox(label="Practitioner Suggestions", lines=3) |
|
timeline_output = gr.Dataframe(label="Emotion Timeline", interactive=False) |
|
face_details = gr.Textbox(label="Face Analysis Details", visible=False) |
|
voice_details = gr.Textbox(label="Voice Analysis Details", visible=False) |
|
|
|
|
|
video_input.change( |
|
process_input, |
|
inputs=[video_input, audio_input], |
|
outputs=[current_face, current_voice, timeline_output, advice_output, face_details, voice_details], |
|
show_progress="hidden" |
|
) |
|
|
|
audio_input.change( |
|
process_input, |
|
inputs=[video_input, audio_input], |
|
outputs=[current_face, current_voice, timeline_output, advice_output, face_details, voice_details], |
|
show_progress="hidden" |
|
) |
|
|
|
submit_btn.click( |
|
process_input, |
|
inputs=[video_input, audio_input], |
|
outputs=[current_face, current_voice, timeline_output, advice_output, face_details, voice_details] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(debug=True, server_name="0.0.0.0", server_port=7860) |