app update
Browse files
app.py
CHANGED
@@ -11,41 +11,90 @@ import requests
|
|
11 |
import os
|
12 |
from sklearn.preprocessing import StandardScaler
|
13 |
|
14 |
-
#
|
15 |
MODEL_URL = "https://github.com/onnx/models/raw/main/vision/body_analysis/emotion_ferplus/model/emotion-ferplus-8.onnx"
|
16 |
MODEL_PATH = "emotion-ferplus-8.onnx"
|
|
|
17 |
|
18 |
-
|
19 |
-
|
20 |
-
|
21 |
-
|
22 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
23 |
|
24 |
-
|
25 |
-
|
26 |
-
|
|
|
|
|
|
|
27 |
|
28 |
-
# Simple voice emotion classifier
|
29 |
class VoiceEmotionClassifier:
|
30 |
def __init__(self):
|
31 |
self.scaler = StandardScaler()
|
|
|
|
|
|
|
32 |
|
33 |
def extract_features(self, audio):
|
34 |
-
|
35 |
-
|
36 |
-
|
37 |
-
# Convert to mono if stereo
|
38 |
-
if len(y.shape) > 1:
|
39 |
-
y = np.mean(y, axis=0)
|
40 |
-
|
41 |
-
# Resample to 16kHz if needed
|
42 |
-
if sr != 16000:
|
43 |
-
y = librosa.resample(y, orig_sr=sr, target_sr=16000)
|
44 |
-
sr = 16000
|
45 |
|
46 |
-
|
47 |
-
|
48 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
49 |
|
50 |
def predict(self, audio):
|
51 |
try:
|
@@ -53,17 +102,20 @@ class VoiceEmotionClassifier:
|
|
53 |
features = self.scaler.transform(features)
|
54 |
|
55 |
# Simple rule-based classifier (replace with actual trained model)
|
56 |
-
if features[0, 0] > 0
|
57 |
return "happy", [{"label": "happy", "score": 0.8}]
|
58 |
-
elif features[0, 0] < -0
|
59 |
return "sad", [{"label": "sad", "score": 0.7}]
|
|
|
|
|
60 |
else:
|
61 |
return "neutral", [{"label": "neutral", "score": 0.9}]
|
62 |
except Exception as e:
|
63 |
-
print(f"Voice
|
64 |
return "neutral", [{"label": "neutral", "score": 1.0}]
|
65 |
|
66 |
# Initialize models
|
|
|
67 |
voice_classifier = VoiceEmotionClassifier()
|
68 |
|
69 |
# Global variables to store results
|
@@ -74,7 +126,6 @@ last_update_time = time.time()
|
|
74 |
def analyze_face(frame):
|
75 |
"""Analyze facial expressions in the frame using ONNX model"""
|
76 |
try:
|
77 |
-
# Preprocess frame
|
78 |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
79 |
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
80 |
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
|
@@ -84,26 +135,19 @@ def analyze_face(frame):
|
|
84 |
face_roi = gray[y:y+h, x:x+w]
|
85 |
face_roi = cv2.resize(face_roi, (64, 64))
|
86 |
face_roi = face_roi.astype('float32') / 255.0
|
87 |
-
face_roi = np.expand_dims(face_roi, axis=0)
|
88 |
-
face_roi = np.expand_dims(face_roi, axis=0)
|
89 |
|
90 |
-
|
91 |
-
input_name = emotion_session.get_inputs()[0].name
|
92 |
-
output_name = emotion_session.get_outputs()[0].name
|
93 |
-
results = emotion_session.run([output_name], {input_name: face_roi})[0]
|
94 |
-
|
95 |
-
# Get emotion probabilities
|
96 |
emotion_probs = results[0]
|
97 |
-
dominant_emotion =
|
98 |
|
99 |
-
|
100 |
-
emotions = {label: float(prob) for label, prob in zip(emotion_labels, emotion_probs)}
|
101 |
return dominant_emotion, emotions
|
102 |
|
103 |
-
return "neutral", {label: 0.0 for label in
|
104 |
except Exception as e:
|
105 |
-
print(f"Face analysis error: {e}")
|
106 |
-
return "neutral", {label: 0.0 for label in
|
107 |
|
108 |
def analyze_voice(audio):
|
109 |
"""Analyze voice tone from audio"""
|
@@ -114,24 +158,16 @@ def update_emotion_history(face_emotion, voice_emotion):
|
|
114 |
global current_emotions, emotion_history, last_update_time
|
115 |
|
116 |
current_time = datetime.now().strftime("%H:%M:%S")
|
117 |
-
|
118 |
-
# Update current emotions
|
119 |
current_emotions = {
|
120 |
"face": face_emotion,
|
121 |
"voice": voice_emotion,
|
122 |
"timestamp": current_time
|
123 |
}
|
124 |
|
125 |
-
# Add to history (every 5 seconds or when emotion changes significantly)
|
126 |
if (time.time() - last_update_time) > 5 or not emotion_history:
|
127 |
-
emotion_history.append(
|
128 |
-
"timestamp": current_time,
|
129 |
-
"face": face_emotion,
|
130 |
-
"voice": voice_emotion
|
131 |
-
})
|
132 |
last_update_time = time.time()
|
133 |
|
134 |
-
# Keep only last 20 entries
|
135 |
if len(emotion_history) > 20:
|
136 |
emotion_history = emotion_history[-20:]
|
137 |
|
@@ -188,13 +224,11 @@ def process_input(video, audio):
|
|
188 |
else:
|
189 |
voice_emotion, voice_details = "neutral", {}
|
190 |
|
191 |
-
# Update history and get outputs
|
192 |
update_emotion_history(face_emotion, voice_emotion)
|
193 |
timeline_df = get_emotion_timeline()
|
194 |
advice = get_practitioner_advice(face_emotion, voice_emotion)
|
195 |
|
196 |
-
|
197 |
-
outputs = {
|
198 |
"current_face": face_emotion,
|
199 |
"current_voice": voice_emotion,
|
200 |
"timeline": timeline_df,
|
@@ -202,10 +236,8 @@ def process_input(video, audio):
|
|
202 |
"face_details": str(face_details),
|
203 |
"voice_details": str(voice_details)
|
204 |
}
|
205 |
-
|
206 |
-
return outputs
|
207 |
except Exception as e:
|
208 |
-
print(f"Processing error: {e}")
|
209 |
return {
|
210 |
"current_face": "Error",
|
211 |
"current_voice": "Error",
|
@@ -256,4 +288,4 @@ with gr.Blocks(title="Patient Emotion Recognition", theme="soft") as demo:
|
|
256 |
)
|
257 |
|
258 |
if __name__ == "__main__":
|
259 |
-
demo.launch(debug=True)
|
|
|
11 |
import os
|
12 |
from sklearn.preprocessing import StandardScaler
|
13 |
|
14 |
+
# Constants
|
15 |
MODEL_URL = "https://github.com/onnx/models/raw/main/vision/body_analysis/emotion_ferplus/model/emotion-ferplus-8.onnx"
|
16 |
MODEL_PATH = "emotion-ferplus-8.onnx"
|
17 |
+
MODEL_CHECKSUM_SIZE = 2483870 # Expected file size in bytes for verification
|
18 |
|
19 |
+
class EmotionModel:
|
20 |
+
def __init__(self):
|
21 |
+
self.session = None
|
22 |
+
self.labels = ['neutral', 'happy', 'surprise', 'sad', 'angry', 'disgust', 'fear', 'contempt']
|
23 |
+
self.load_model()
|
24 |
+
|
25 |
+
def download_model(self):
|
26 |
+
try:
|
27 |
+
print("Downloading emotion recognition model...")
|
28 |
+
response = requests.get(MODEL_URL, stream=True, timeout=30)
|
29 |
+
response.raise_for_status()
|
30 |
+
|
31 |
+
with open(MODEL_PATH, "wb") as f:
|
32 |
+
for chunk in response.iter_content(chunk_size=8192):
|
33 |
+
if chunk:
|
34 |
+
f.write(chunk)
|
35 |
+
|
36 |
+
# Verify download
|
37 |
+
if os.path.exists(MODEL_PATH):
|
38 |
+
actual_size = os.path.getsize(MODEL_PATH)
|
39 |
+
if actual_size != MODEL_CHECKSUM_SIZE:
|
40 |
+
print(f"Warning: Downloaded file size {actual_size} doesn't match expected size {MODEL_CHECKSUM_SIZE}")
|
41 |
+
return True
|
42 |
+
return False
|
43 |
+
except Exception as e:
|
44 |
+
print(f"Download failed: {str(e)}")
|
45 |
+
return False
|
46 |
+
|
47 |
+
def load_model(self):
|
48 |
+
if not os.path.exists(MODEL_PATH):
|
49 |
+
if not self.download_model():
|
50 |
+
print("Using dummy emotion model")
|
51 |
+
self.session = DummyEmotionSession()
|
52 |
+
return
|
53 |
+
|
54 |
+
try:
|
55 |
+
so = ort.SessionOptions()
|
56 |
+
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
|
57 |
+
self.session = ort.InferenceSession(MODEL_PATH, so)
|
58 |
+
print("Emotion model loaded successfully")
|
59 |
+
except Exception as e:
|
60 |
+
print(f"Failed to load ONNX model: {str(e)}")
|
61 |
+
print("Using dummy emotion model")
|
62 |
+
self.session = DummyEmotionSession()
|
63 |
+
|
64 |
+
def predict(self, frame):
|
65 |
+
return self.session.run(None, {'Input3': frame})[0]
|
66 |
|
67 |
+
class DummyEmotionSession:
|
68 |
+
def run(self, *args, **kwargs):
|
69 |
+
# Return mostly neutral with slight random variations
|
70 |
+
base = np.array([0.8] + [0.1]*7)
|
71 |
+
variation = np.random.normal(0, 0.01, size=8)
|
72 |
+
return [np.clip(base + variation, 0, 1).reshape(1, -1)]
|
73 |
|
|
|
74 |
class VoiceEmotionClassifier:
|
75 |
def __init__(self):
|
76 |
self.scaler = StandardScaler()
|
77 |
+
# Initialize with dummy data for scaling
|
78 |
+
dummy_features = np.random.randn(100, 13)
|
79 |
+
self.scaler.fit(dummy_features)
|
80 |
|
81 |
def extract_features(self, audio):
|
82 |
+
try:
|
83 |
+
sr, y = audio
|
84 |
+
y = y.astype(np.float32)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
85 |
|
86 |
+
if len(y.shape) > 1: # Convert stereo to mono
|
87 |
+
y = np.mean(y, axis=0)
|
88 |
+
|
89 |
+
if sr != 16000: # Resample if needed
|
90 |
+
y = librosa.resample(y, orig_sr=sr, target_sr=16000)
|
91 |
+
sr = 16000
|
92 |
+
|
93 |
+
mfcc_features = mfcc(y, sr, numcep=13)
|
94 |
+
return np.mean(mfcc_features, axis=0)
|
95 |
+
except Exception as e:
|
96 |
+
print(f"Feature extraction error: {str(e)}")
|
97 |
+
return np.zeros(13)
|
98 |
|
99 |
def predict(self, audio):
|
100 |
try:
|
|
|
102 |
features = self.scaler.transform(features)
|
103 |
|
104 |
# Simple rule-based classifier (replace with actual trained model)
|
105 |
+
if features[0, 0] > 1.0:
|
106 |
return "happy", [{"label": "happy", "score": 0.8}]
|
107 |
+
elif features[0, 0] < -1.0:
|
108 |
return "sad", [{"label": "sad", "score": 0.7}]
|
109 |
+
elif abs(features[0, 1]) > 0.8:
|
110 |
+
return "angry", [{"label": "angry", "score": 0.6}]
|
111 |
else:
|
112 |
return "neutral", [{"label": "neutral", "score": 0.9}]
|
113 |
except Exception as e:
|
114 |
+
print(f"Voice prediction error: {str(e)}")
|
115 |
return "neutral", [{"label": "neutral", "score": 1.0}]
|
116 |
|
117 |
# Initialize models
|
118 |
+
emotion_model = EmotionModel()
|
119 |
voice_classifier = VoiceEmotionClassifier()
|
120 |
|
121 |
# Global variables to store results
|
|
|
126 |
def analyze_face(frame):
|
127 |
"""Analyze facial expressions in the frame using ONNX model"""
|
128 |
try:
|
|
|
129 |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
130 |
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
131 |
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
|
|
|
135 |
face_roi = gray[y:y+h, x:x+w]
|
136 |
face_roi = cv2.resize(face_roi, (64, 64))
|
137 |
face_roi = face_roi.astype('float32') / 255.0
|
138 |
+
face_roi = np.expand_dims(face_roi, axis=(0, 1))
|
|
|
139 |
|
140 |
+
results = emotion_model.predict(face_roi)
|
|
|
|
|
|
|
|
|
|
|
141 |
emotion_probs = results[0]
|
142 |
+
dominant_emotion = emotion_model.labels[np.argmax(emotion_probs)]
|
143 |
|
144 |
+
emotions = {label: float(prob) for label, prob in zip(emotion_model.labels, emotion_probs)}
|
|
|
145 |
return dominant_emotion, emotions
|
146 |
|
147 |
+
return "neutral", {label: 0.0 for label in emotion_model.labels}
|
148 |
except Exception as e:
|
149 |
+
print(f"Face analysis error: {str(e)}")
|
150 |
+
return "neutral", {label: 0.0 for label in emotion_model.labels}
|
151 |
|
152 |
def analyze_voice(audio):
|
153 |
"""Analyze voice tone from audio"""
|
|
|
158 |
global current_emotions, emotion_history, last_update_time
|
159 |
|
160 |
current_time = datetime.now().strftime("%H:%M:%S")
|
|
|
|
|
161 |
current_emotions = {
|
162 |
"face": face_emotion,
|
163 |
"voice": voice_emotion,
|
164 |
"timestamp": current_time
|
165 |
}
|
166 |
|
|
|
167 |
if (time.time() - last_update_time) > 5 or not emotion_history:
|
168 |
+
emotion_history.append(current_emotions.copy())
|
|
|
|
|
|
|
|
|
169 |
last_update_time = time.time()
|
170 |
|
|
|
171 |
if len(emotion_history) > 20:
|
172 |
emotion_history = emotion_history[-20:]
|
173 |
|
|
|
224 |
else:
|
225 |
voice_emotion, voice_details = "neutral", {}
|
226 |
|
|
|
227 |
update_emotion_history(face_emotion, voice_emotion)
|
228 |
timeline_df = get_emotion_timeline()
|
229 |
advice = get_practitioner_advice(face_emotion, voice_emotion)
|
230 |
|
231 |
+
return {
|
|
|
232 |
"current_face": face_emotion,
|
233 |
"current_voice": voice_emotion,
|
234 |
"timeline": timeline_df,
|
|
|
236 |
"face_details": str(face_details),
|
237 |
"voice_details": str(voice_details)
|
238 |
}
|
|
|
|
|
239 |
except Exception as e:
|
240 |
+
print(f"Processing error: {str(e)}")
|
241 |
return {
|
242 |
"current_face": "Error",
|
243 |
"current_voice": "Error",
|
|
|
288 |
)
|
289 |
|
290 |
if __name__ == "__main__":
|
291 |
+
demo.launch(debug=True, server_name="0.0.0.0", server_port=7860)
|