Spaces:
Sleeping
Sleeping
import cv2 | |
import mediapipe as mp | |
import numpy as np | |
from typing import List, Dict, Tuple | |
class PoseAnalyzer: | |
# Add MediaPipe skeleton connections as a class variable | |
MP_CONNECTIONS = [ | |
(11, 13), (13, 15), # Left arm | |
(12, 14), (14, 16), # Right arm | |
(11, 12), # Shoulders | |
(11, 23), (12, 24), # Torso sides | |
(23, 24), # Hips | |
(23, 25), (25, 27), # Left leg | |
(24, 26), (26, 28), # Right leg | |
(27, 31), (28, 32), # Ankles to feet | |
(15, 17), (16, 18), # Wrists to hands | |
(15, 19), (16, 20), # Wrists to pinky | |
(15, 21), (16, 22), # Wrists to index | |
(15, 17), (17, 19), (19, 21), # Left hand | |
(16, 18), (18, 20), (20, 22) # Right hand | |
] | |
def __init__(self): | |
# Initialize MediaPipe Pose | |
self.mp_pose = mp.solutions.pose | |
self.pose = self.mp_pose.Pose( | |
static_image_mode=False, | |
model_complexity=2, # Using the most accurate model | |
min_detection_confidence=0.1, | |
min_tracking_confidence=0.1 | |
) | |
self.mp_drawing = mp.solutions.drawing_utils | |
# Define key angles for bodybuilding poses | |
self.key_angles = { | |
'front_double_biceps': { | |
'shoulder_angle': (90, 120), # Expected angle range | |
'elbow_angle': (80, 100), | |
'wrist_angle': (0, 20) | |
}, | |
'side_chest': { | |
'shoulder_angle': (45, 75), | |
'elbow_angle': (90, 110), | |
'wrist_angle': (0, 20) | |
}, | |
'back_double_biceps': { | |
'shoulder_angle': (90, 120), | |
'elbow_angle': (80, 100), | |
'wrist_angle': (0, 20) | |
} | |
} | |
def detect_pose(self, frame: np.ndarray, last_valid_landmarks=None) -> Tuple[np.ndarray, List[Dict]]: | |
""" | |
Detect pose in the given frame and return the frame with pose landmarks drawn | |
and the list of detected landmarks. If detection fails, reuse last valid landmarks if provided. | |
""" | |
# Convert the BGR image to RGB | |
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Process the frame and detect pose | |
results = self.pose.process(rgb_frame) | |
# Draw the pose landmarks on the frame | |
if results.pose_landmarks: | |
# Draw all 33 keypoints as bright red, smaller circles, and show index | |
for idx, landmark in enumerate(results.pose_landmarks.landmark): | |
x = int(landmark.x * frame.shape[1]) | |
y = int(landmark.y * frame.shape[0]) | |
if landmark.visibility > 0.1: # Lowered threshold from 0.3 to 0.1 | |
cv2.circle(frame, (x, y), 3, (0, 0, 255), -1) | |
cv2.putText(frame, str(idx), (x+8, y-8), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) | |
# Draw skeleton lines | |
# Convert landmarks to pixel coordinates for easier access | |
landmark_points = [] | |
for landmark in results.pose_landmarks.landmark: | |
landmark_points.append((int(landmark.x * frame.shape[1]), int(landmark.y * frame.shape[0]), landmark.visibility)) | |
for pt1, pt2 in self.MP_CONNECTIONS: | |
if pt1 < len(landmark_points) and pt2 < len(landmark_points): | |
x1, y1, v1 = landmark_points[pt1] | |
x2, y2, v2 = landmark_points[pt2] | |
if v1 > 0.1 and v2 > 0.1: | |
cv2.line(frame, (x1, y1), (x2, y2), (0, 255, 255), 2) | |
# Convert landmarks to a list of dictionaries | |
landmarks = [] | |
for idx, landmark in enumerate(results.pose_landmarks.landmark): | |
landmarks.append({ | |
'x': landmark.x, | |
'y': landmark.y, | |
'z': landmark.z, | |
'visibility': landmark.visibility | |
}) | |
return frame, landmarks | |
# If detection fails, reuse last valid landmarks if provided | |
if last_valid_landmarks is not None: | |
return frame, last_valid_landmarks | |
return frame, [] | |
def calculate_angle(self, landmarks: List[Dict], joint1: int, joint2: int, joint3: int) -> float: | |
""" | |
Calculate the angle between three joints. | |
""" | |
if len(landmarks) < max(joint1, joint2, joint3): | |
return None | |
# Get the coordinates of the three joints | |
p1 = np.array([landmarks[joint1]['x'], landmarks[joint1]['y']]) | |
p2 = np.array([landmarks[joint2]['x'], landmarks[joint2]['y']]) | |
p3 = np.array([landmarks[joint3]['x'], landmarks[joint3]['y']]) | |
# Calculate the angle | |
v1 = p1 - p2 | |
v2 = p3 - p2 | |
angle = np.degrees(np.arccos( | |
np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)) | |
)) | |
return angle | |
def analyze_pose(self, landmarks: List[Dict], pose_type: str) -> Dict: | |
""" | |
Analyze the pose and provide feedback based on the pose type. | |
Enhanced: Calculates angles for both left and right arms (shoulder, elbow, wrist) for all pose types. | |
""" | |
if not landmarks or pose_type not in self.key_angles: | |
return {'error': 'Invalid pose type or no landmarks detected'} | |
feedback = { | |
'pose_type': pose_type, | |
'angles': {}, | |
'corrections': [] | |
} | |
# Indices for MediaPipe 33 keypoints | |
LEFT_SHOULDER = 11 | |
RIGHT_SHOULDER = 12 | |
LEFT_ELBOW = 13 | |
RIGHT_ELBOW = 14 | |
LEFT_WRIST = 15 | |
RIGHT_WRIST = 16 | |
LEFT_HIP = 23 | |
RIGHT_HIP = 24 | |
LEFT_KNEE = 25 | |
RIGHT_KNEE = 26 | |
LEFT_ANKLE = 27 | |
RIGHT_ANKLE = 28 | |
# Calculate angles for both arms | |
# Shoulder angles (hip-shoulder-elbow) | |
l_shoulder_angle = self.calculate_angle(landmarks, LEFT_HIP, LEFT_SHOULDER, LEFT_ELBOW) | |
r_shoulder_angle = self.calculate_angle(landmarks, RIGHT_HIP, RIGHT_SHOULDER, RIGHT_ELBOW) | |
# Elbow angles (shoulder-elbow-wrist) | |
l_elbow_angle = self.calculate_angle(landmarks, LEFT_SHOULDER, LEFT_ELBOW, LEFT_WRIST) | |
r_elbow_angle = self.calculate_angle(landmarks, RIGHT_SHOULDER, RIGHT_ELBOW, RIGHT_WRIST) | |
# Wrist angles (elbow-wrist-hand index, if available) | |
# MediaPipe does not have hand index, so we can use a pseudo point (e.g., extend wrist direction) | |
# For now, skip wrist angle or set to None | |
# Leg angles (optional) | |
l_knee_angle = self.calculate_angle(landmarks, LEFT_HIP, LEFT_KNEE, LEFT_ANKLE) | |
r_knee_angle = self.calculate_angle(landmarks, RIGHT_HIP, RIGHT_KNEE, RIGHT_ANKLE) | |
# Add angles to feedback | |
if l_shoulder_angle: | |
feedback['angles']['L Shoulder'] = l_shoulder_angle | |
if not self.key_angles[pose_type]['shoulder_angle'][0] <= l_shoulder_angle <= self.key_angles[pose_type]['shoulder_angle'][1]: | |
feedback['corrections'].append( | |
f"Adjust L Shoulder to {self.key_angles[pose_type]['shoulder_angle'][0]}-{self.key_angles[pose_type]['shoulder_angle'][1]} deg" | |
) | |
if r_shoulder_angle: | |
feedback['angles']['R Shoulder'] = r_shoulder_angle | |
if not self.key_angles[pose_type]['shoulder_angle'][0] <= r_shoulder_angle <= self.key_angles[pose_type]['shoulder_angle'][1]: | |
feedback['corrections'].append( | |
f"Adjust R Shoulder to {self.key_angles[pose_type]['shoulder_angle'][0]}-{self.key_angles[pose_type]['shoulder_angle'][1]} deg" | |
) | |
if l_elbow_angle: | |
feedback['angles']['L Elbow'] = l_elbow_angle | |
if not self.key_angles[pose_type]['elbow_angle'][0] <= l_elbow_angle <= self.key_angles[pose_type]['elbow_angle'][1]: | |
feedback['corrections'].append( | |
f"Adjust L Elbow to {self.key_angles[pose_type]['elbow_angle'][0]}-{self.key_angles[pose_type]['elbow_angle'][1]} deg" | |
) | |
if r_elbow_angle: | |
feedback['angles']['R Elbow'] = r_elbow_angle | |
if not self.key_angles[pose_type]['elbow_angle'][0] <= r_elbow_angle <= self.key_angles[pose_type]['elbow_angle'][1]: | |
feedback['corrections'].append( | |
f"Adjust R Elbow to {self.key_angles[pose_type]['elbow_angle'][0]}-{self.key_angles[pose_type]['elbow_angle'][1]} deg" | |
) | |
# Optionally add knee angles | |
if l_knee_angle: | |
feedback['angles']['L Knee'] = l_knee_angle | |
if r_knee_angle: | |
feedback['angles']['R Knee'] = r_knee_angle | |
return feedback | |
def process_frame(self, frame: np.ndarray, pose_type: str = 'front_double_biceps', last_valid_landmarks=None) -> Tuple[np.ndarray, Dict, List[Dict]]: | |
""" | |
Process a single frame, detect pose, and analyze it. Returns frame, analysis, and used landmarks. | |
""" | |
# Detect pose | |
frame_with_pose, landmarks = self.detect_pose(frame, last_valid_landmarks=last_valid_landmarks) | |
# Analyze pose if landmarks are detected | |
analysis = self.analyze_pose(landmarks, pose_type) if landmarks else {'error': 'No pose detected'} | |
return frame_with_pose, analysis, landmarks |