|
import gradio as gr |
|
import numpy as np |
|
import pandas as pd |
|
import plotly.graph_objects as go |
|
import plotly.express as px |
|
from datetime import datetime, timedelta |
|
import threading |
|
import queue |
|
import time |
|
from collections import deque |
|
import warnings |
|
import traceback |
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
AUDIO_AVAILABLE = True |
|
try: |
|
import soundfile as sf |
|
import librosa |
|
LIBROSA_AVAILABLE = True |
|
except ImportError: |
|
LIBROSA_AVAILABLE = False |
|
print("Librosa not available - using basic audio processing") |
|
|
|
|
|
CV2_AVAILABLE = True |
|
try: |
|
import cv2 |
|
except ImportError: |
|
CV2_AVAILABLE = False |
|
print("OpenCV not available - using PIL for image processing") |
|
|
|
try: |
|
from PIL import Image, ImageDraw, ImageFont |
|
PIL_AVAILABLE = True |
|
except ImportError: |
|
PIL_AVAILABLE = False |
|
print("PIL not available - limited image processing") |
|
|
|
|
|
HF_AVAILABLE = True |
|
try: |
|
from transformers import pipeline |
|
import torch |
|
except ImportError: |
|
HF_AVAILABLE = False |
|
print("Transformers not available - using mock emotion detection") |
|
|
|
class EmotionRecognitionSystem: |
|
def __init__(self): |
|
self.emotion_history = deque(maxlen=100) |
|
self.audio_queue = queue.Queue() |
|
self.video_queue = queue.Queue() |
|
self.setup_models() |
|
|
|
self.alert_thresholds = { |
|
'stress': 0.7, |
|
'anxiety': 0.6, |
|
'pain': 0.8, |
|
'confusion': 0.5 |
|
} |
|
|
|
def setup_models(self): |
|
"""Initialize emotion recognition models with better error handling""" |
|
self.models_loaded = False |
|
|
|
if not HF_AVAILABLE: |
|
print("Skipping model loading - transformers not available") |
|
return |
|
|
|
try: |
|
|
|
self.face_emotion_pipeline = pipeline( |
|
"image-classification", |
|
model="j-hartmann/emotion-english-distilroberta-base", |
|
device=0 if torch.cuda.is_available() else -1 |
|
) |
|
|
|
|
|
self.audio_emotion_pipeline = pipeline( |
|
"audio-classification", |
|
model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition", |
|
device=0 if torch.cuda.is_available() else -1 |
|
) |
|
self.models_loaded = True |
|
except Exception as e: |
|
print(f"Error loading models: {e}") |
|
print(traceback.format_exc()) |
|
self.models_loaded = False |
|
|
|
def validate_audio_input(self, audio_data): |
|
"""Validate and standardize audio input format""" |
|
if audio_data is None: |
|
return None |
|
|
|
try: |
|
|
|
if isinstance(audio_data, tuple): |
|
audio_array, sample_rate = audio_data |
|
else: |
|
|
|
if isinstance(audio_data, str): |
|
if LIBROSA_AVAILABLE: |
|
audio_array, sample_rate = librosa.load(audio_data, sr=None) |
|
else: |
|
|
|
import wave |
|
with wave.open(audio_data, 'rb') as wf: |
|
sample_rate = wf.getframerate() |
|
audio_array = np.frombuffer(wf.readframes(wf.getnframes()), dtype=np.int16) |
|
audio_array = audio_array.astype(np.float32) / 32768.0 |
|
else: |
|
return None |
|
|
|
|
|
target_rate = 16000 |
|
if sample_rate != target_rate: |
|
if LIBROSA_AVAILABLE: |
|
audio_array = librosa.resample(audio_array, orig_sr=sample_rate, target_sr=target_rate) |
|
else: |
|
|
|
step = int(sample_rate / target_rate) |
|
if step > 1: |
|
audio_array = audio_array[::step] |
|
sample_rate = target_rate |
|
|
|
return (audio_array, sample_rate) |
|
|
|
except Exception as e: |
|
print(f"Audio validation error: {e}") |
|
return None |
|
|
|
def detect_face_emotion(self, frame): |
|
"""Detect emotions from facial expressions with better error handling""" |
|
if not self.models_loaded: |
|
|
|
emotions = ['neutral', 'happy', 'sad', 'angry', 'fear', 'surprise', 'disgust'] |
|
scores = np.random.dirichlet(np.ones(len(emotions))) |
|
return dict(zip(emotions, scores)) |
|
|
|
try: |
|
|
|
if isinstance(frame, np.ndarray): |
|
if len(frame.shape) == 3: |
|
if frame.shape[2] == 4: |
|
rgb_frame = frame[:, :, :3] |
|
elif frame.shape[2] == 3: |
|
if CV2_AVAILABLE: |
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
else: |
|
rgb_frame = frame[:, :, ::-1] |
|
else: |
|
rgb_frame = frame |
|
else: |
|
|
|
if CV2_AVAILABLE: |
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) |
|
else: |
|
rgb_frame = np.stack((frame,)*3, axis=-1) |
|
else: |
|
rgb_frame = frame |
|
|
|
|
|
results = self.face_emotion_pipeline(rgb_frame) |
|
|
|
|
|
emotion_scores = {} |
|
for result in results: |
|
emotion_scores[result['label'].lower()] = result['score'] |
|
|
|
return emotion_scores |
|
|
|
except Exception as e: |
|
print(f"Face emotion detection error: {e}") |
|
return {'neutral': 1.0} |
|
|
|
def detect_voice_emotion(self, audio_data): |
|
"""Detect emotions from voice tone with better audio handling""" |
|
if not self.models_loaded or audio_data is None: |
|
|
|
emotions = ['neutral', 'happy', 'sad', 'angry', 'fear'] |
|
scores = np.random.dirichlet(np.ones(len(emotions))) |
|
return dict(zip(emotions, scores)) |
|
|
|
try: |
|
|
|
validated_audio = self.validate_audio_input(audio_data) |
|
if validated_audio is None: |
|
return {'neutral': 1.0} |
|
|
|
audio_array, sample_rate = validated_audio |
|
|
|
|
|
results = self.audio_emotion_pipeline({ |
|
"array": audio_array, |
|
"sampling_rate": sample_rate |
|
}) |
|
|
|
emotion_scores = {} |
|
for result in results: |
|
emotion_scores[result['label'].lower()] = result['score'] |
|
|
|
return emotion_scores |
|
|
|
except Exception as e: |
|
print(f"Voice emotion detection error: {e}") |
|
return {'neutral': 1.0} |
|
|
|
|
|
|
|
def process_video_audio(video_frame, audio_data): |
|
"""Process video frame and audio data with better error handling""" |
|
if video_frame is None: |
|
return None, "No video input", "", "" |
|
|
|
try: |
|
|
|
validated_audio = emotion_system.validate_audio_input(audio_data) |
|
|
|
|
|
emotion_record = emotion_system.process_frame( |
|
video_frame, |
|
validated_audio[0] if validated_audio else None, |
|
validated_audio[1] if validated_audio else 16000 |
|
) |
|
|
|
|
|
annotated_frame = create_emotion_overlay(video_frame, emotion_record) |
|
|
|
|
|
clinical_text = format_clinical_emotions(emotion_record['clinical_emotions']) |
|
alerts_text = "\n".join(emotion_record['alerts']) if emotion_record['alerts'] else "No alerts" |
|
suggestions_text = "\n".join(emotion_record['suggestions']) if emotion_record['suggestions'] else "No suggestions" |
|
|
|
return annotated_frame, clinical_text, alerts_text, suggestions_text |
|
|
|
except Exception as e: |
|
print(f"Processing error: {e}") |
|
traceback.print_exc() |
|
return video_frame, "Processing error", "System error", "Please try again" |
|
|
|
|
|
|
|
def create_interface(): |
|
with gr.Blocks(title="Patient Emotion Recognition System", theme=gr.themes.Soft()) as demo: |
|
|
|
|
|
|
|
gr.Markdown(""" |
|
### π Audio Input Notes: |
|
- System works best with clear microphone input |
|
- If you get audio errors, try: |
|
- Checking microphone permissions |
|
- Reducing background noise |
|
- Using a different microphone |
|
""") |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
emotion_system = EmotionRecognitionSystem() |
|
demo = create_interface() |
|
demo.launch( |
|
share=True, |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
show_error=True |
|
) |