Update src/streamlit_app.py
Browse files- src/streamlit_app.py +80 -66
src/streamlit_app.py
CHANGED
@@ -1,35 +1,41 @@
|
|
1 |
import streamlit as st
|
2 |
-
import av
|
3 |
-
import cv2
|
4 |
import numpy as np
|
5 |
-
import mediapipe as mp
|
6 |
-
from streamlit_webrtc import webrtc_streamer, WebRtcMode
|
7 |
|
8 |
-
# Initialize MediaPipe Pose
|
9 |
mp_pose = mp.solutions.pose
|
10 |
mp_drawing = mp.solutions.drawing_utils
|
11 |
|
12 |
-
#
|
13 |
-
if 'camera_access' not in st.session_state:
|
14 |
-
st.session_state.camera_access = False
|
15 |
if 'posture_status' not in st.session_state:
|
16 |
-
st.session_state.posture_status = "
|
17 |
-
if 'last_status' not in st.session_state:
|
18 |
-
st.session_state.last_status = ""
|
19 |
|
20 |
def analyze_posture(image):
|
21 |
-
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
22 |
with mp_pose.Pose(
|
23 |
min_detection_confidence=0.5,
|
24 |
min_tracking_confidence=0.5,
|
25 |
model_complexity=1
|
26 |
) as pose:
|
27 |
|
|
|
28 |
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
29 |
results = pose.process(image_rgb)
|
30 |
|
31 |
annotated_image = image.copy()
|
|
|
32 |
if results.pose_landmarks:
|
|
|
33 |
mp_drawing.draw_landmarks(
|
34 |
annotated_image,
|
35 |
results.pose_landmarks,
|
@@ -39,15 +45,22 @@ def analyze_posture(image):
|
|
39 |
)
|
40 |
posture_status = check_posture(results.pose_landmarks, image.shape)
|
41 |
else:
|
42 |
-
posture_status = "
|
43 |
|
44 |
return annotated_image, posture_status
|
45 |
|
46 |
def check_posture(landmarks, image_shape):
|
47 |
-
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
48 |
h, w, _ = image_shape
|
49 |
|
50 |
-
# Get key points
|
51 |
left_shoulder = landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER]
|
52 |
right_shoulder = landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER]
|
53 |
left_hip = landmarks.landmark[mp_pose.PoseLandmark.LEFT_HIP]
|
@@ -56,102 +69,103 @@ def check_posture(landmarks, image_shape):
|
|
56 |
right_ear = landmarks.landmark[mp_pose.PoseLandmark.RIGHT_EAR]
|
57 |
nose = landmarks.landmark[mp_pose.PoseLandmark.NOSE]
|
58 |
|
59 |
-
# Determine
|
60 |
sitting = left_hip.y < left_shoulder.y + 0.1 or right_hip.y < right_shoulder.y + 0.1
|
61 |
|
62 |
messages = []
|
63 |
|
64 |
-
#
|
65 |
head_forward = (left_ear.y > left_shoulder.y + 0.1 or right_ear.y > right_shoulder.y + 0.1) and \
|
66 |
(nose.y > left_shoulder.y or nose.y > right_shoulder.y)
|
67 |
if head_forward:
|
68 |
-
messages.append("•
|
69 |
|
70 |
-
#
|
71 |
shoulders_rounded = left_shoulder.x > left_hip.x + 0.05 or right_shoulder.x < right_hip.x - 0.05
|
72 |
if shoulders_rounded:
|
73 |
-
messages.append("•
|
74 |
|
75 |
-
#
|
76 |
shoulder_diff = abs(left_shoulder.y - right_shoulder.y)
|
77 |
hip_diff = abs(left_hip.y - right_hip.y)
|
78 |
if shoulder_diff > 0.05 or hip_diff > 0.05:
|
79 |
-
messages.append("•
|
80 |
|
81 |
-
#
|
82 |
if sitting and (left_hip.y < left_shoulder.y + 0.15 or right_hip.y < right_shoulder.y + 0.15):
|
83 |
-
messages.append("•
|
84 |
|
85 |
# Generate final report
|
86 |
if messages:
|
87 |
report = [
|
88 |
-
f"**{'
|
89 |
*messages,
|
90 |
-
"\n
|
91 |
-
"•
|
92 |
-
"•
|
93 |
-
"•
|
94 |
-
"•
|
95 |
]
|
96 |
else:
|
97 |
report = [
|
98 |
-
f"
|
99 |
-
"
|
100 |
-
"\n
|
101 |
-
"•
|
102 |
]
|
103 |
|
104 |
return "\n\n".join(report)
|
105 |
|
106 |
def video_frame_callback(frame):
|
107 |
-
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
108 |
img = frame.to_ndarray(format="bgr24")
|
109 |
|
110 |
try:
|
|
|
111 |
analyzed_img, posture_status = analyze_posture(img)
|
112 |
-
|
113 |
-
st.session_state.posture_status = posture_status
|
114 |
-
st.session_state.last_status = posture_status
|
115 |
return av.VideoFrame.from_ndarray(analyzed_img, format="bgr24")
|
116 |
except Exception as e:
|
117 |
-
st.error(f"
|
118 |
-
return
|
119 |
|
120 |
def main():
|
|
|
121 |
st.set_page_config(layout="wide")
|
122 |
-
st.title("📷
|
123 |
|
124 |
-
# Create
|
125 |
col1, col2 = st.columns([2, 1])
|
126 |
|
127 |
with col1:
|
128 |
-
st.header("
|
129 |
|
130 |
-
|
131 |
-
|
132 |
-
|
133 |
-
|
134 |
-
|
135 |
-
|
136 |
-
|
137 |
-
|
138 |
-
|
139 |
-
|
140 |
-
|
141 |
-
|
142 |
-
|
143 |
-
|
144 |
-
|
145 |
-
)
|
146 |
-
|
147 |
-
if not webrtc_ctx.state.playing:
|
148 |
-
st.session_state.posture_status = "カメラが停止しました (Camera stopped)"
|
149 |
-
st.session_state.last_status = ""
|
150 |
|
151 |
with col2:
|
152 |
-
st.header("
|
153 |
-
|
154 |
-
status_placeholder.markdown(st.session_state.posture_status)
|
155 |
|
156 |
if __name__ == "__main__":
|
157 |
main()
|
|
|
1 |
import streamlit as st
|
2 |
+
import av # For video frame processing
|
3 |
+
import cv2 # OpenCV for image processing
|
4 |
import numpy as np
|
5 |
+
import mediapipe as mp # Pose estimation
|
6 |
+
from streamlit_webrtc import webrtc_streamer, WebRtcMode # WebRTC integration
|
7 |
|
8 |
+
# Initialize MediaPipe Pose components
|
9 |
mp_pose = mp.solutions.pose
|
10 |
mp_drawing = mp.solutions.drawing_utils
|
11 |
|
12 |
+
# Initialize session state variables
|
|
|
|
|
13 |
if 'posture_status' not in st.session_state:
|
14 |
+
st.session_state.posture_status = "Please enable camera for analysis"
|
|
|
|
|
15 |
|
16 |
def analyze_posture(image):
|
17 |
+
"""
|
18 |
+
Analyze posture using MediaPipe Pose
|
19 |
+
Args:
|
20 |
+
image: Input frame from camera
|
21 |
+
Returns:
|
22 |
+
annotated_image: Frame with pose landmarks drawn
|
23 |
+
posture_status: Analysis results text
|
24 |
+
"""
|
25 |
with mp_pose.Pose(
|
26 |
min_detection_confidence=0.5,
|
27 |
min_tracking_confidence=0.5,
|
28 |
model_complexity=1
|
29 |
) as pose:
|
30 |
|
31 |
+
# Convert color space and process frame
|
32 |
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
33 |
results = pose.process(image_rgb)
|
34 |
|
35 |
annotated_image = image.copy()
|
36 |
+
|
37 |
if results.pose_landmarks:
|
38 |
+
# Draw pose landmarks on the frame
|
39 |
mp_drawing.draw_landmarks(
|
40 |
annotated_image,
|
41 |
results.pose_landmarks,
|
|
|
45 |
)
|
46 |
posture_status = check_posture(results.pose_landmarks, image.shape)
|
47 |
else:
|
48 |
+
posture_status = "No pose detected - ensure full body is visible"
|
49 |
|
50 |
return annotated_image, posture_status
|
51 |
|
52 |
def check_posture(landmarks, image_shape):
|
53 |
+
"""
|
54 |
+
Analyze body landmarks and generate posture report
|
55 |
+
Args:
|
56 |
+
landmarks: Detected pose landmarks
|
57 |
+
image_shape: Dimensions of input image
|
58 |
+
Returns:
|
59 |
+
Formatted posture analysis report
|
60 |
+
"""
|
61 |
h, w, _ = image_shape
|
62 |
|
63 |
+
# Get key body points
|
64 |
left_shoulder = landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER]
|
65 |
right_shoulder = landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER]
|
66 |
left_hip = landmarks.landmark[mp_pose.PoseLandmark.LEFT_HIP]
|
|
|
69 |
right_ear = landmarks.landmark[mp_pose.PoseLandmark.RIGHT_EAR]
|
70 |
nose = landmarks.landmark[mp_pose.PoseLandmark.NOSE]
|
71 |
|
72 |
+
# Determine if sitting or standing
|
73 |
sitting = left_hip.y < left_shoulder.y + 0.1 or right_hip.y < right_shoulder.y + 0.1
|
74 |
|
75 |
messages = []
|
76 |
|
77 |
+
# Forward head posture check
|
78 |
head_forward = (left_ear.y > left_shoulder.y + 0.1 or right_ear.y > right_shoulder.y + 0.1) and \
|
79 |
(nose.y > left_shoulder.y or nose.y > right_shoulder.y)
|
80 |
if head_forward:
|
81 |
+
messages.append("• Forward head tilt detected (text neck)")
|
82 |
|
83 |
+
# Rounded shoulders check
|
84 |
shoulders_rounded = left_shoulder.x > left_hip.x + 0.05 or right_shoulder.x < right_hip.x - 0.05
|
85 |
if shoulders_rounded:
|
86 |
+
messages.append("• Rounded shoulders detected")
|
87 |
|
88 |
+
# Side tilt check
|
89 |
shoulder_diff = abs(left_shoulder.y - right_shoulder.y)
|
90 |
hip_diff = abs(left_hip.y - right_hip.y)
|
91 |
if shoulder_diff > 0.05 or hip_diff > 0.05:
|
92 |
+
messages.append("• Body leaning to one side")
|
93 |
|
94 |
+
# Pelvis position check
|
95 |
if sitting and (left_hip.y < left_shoulder.y + 0.15 or right_hip.y < right_shoulder.y + 0.15):
|
96 |
+
messages.append("• Pelvis tilted forward (sitting posture)")
|
97 |
|
98 |
# Generate final report
|
99 |
if messages:
|
100 |
report = [
|
101 |
+
f"**{'Sitting' if sitting else 'Standing'} posture issues detected:**",
|
102 |
*messages,
|
103 |
+
"\n**Recommendations:**",
|
104 |
+
"• Keep head straight - ears over shoulders",
|
105 |
+
"• Pull shoulders back and down",
|
106 |
+
"• Maintain straight back, avoid side leaning",
|
107 |
+
"• When sitting, support weight on sitting bones"
|
108 |
]
|
109 |
else:
|
110 |
report = [
|
111 |
+
f"**Excellent {'sitting' if sitting else 'standing'} posture!**",
|
112 |
+
"All key points are properly aligned",
|
113 |
+
"\n**Tips:**",
|
114 |
+
"• Continue monitoring your posture daily"
|
115 |
]
|
116 |
|
117 |
return "\n\n".join(report)
|
118 |
|
119 |
def video_frame_callback(frame):
|
120 |
+
"""
|
121 |
+
Callback function for processing each video frame
|
122 |
+
Args:
|
123 |
+
frame: Incoming video frame from WebRTC
|
124 |
+
Returns:
|
125 |
+
Processed video frame with pose landmarks
|
126 |
+
"""
|
127 |
img = frame.to_ndarray(format="bgr24")
|
128 |
|
129 |
try:
|
130 |
+
# Analyze posture and update session state
|
131 |
analyzed_img, posture_status = analyze_posture(img)
|
132 |
+
st.session_state.posture_status = posture_status
|
|
|
|
|
133 |
return av.VideoFrame.from_ndarray(analyzed_img, format="bgr24")
|
134 |
except Exception as e:
|
135 |
+
st.error(f"Processing error: {str(e)}")
|
136 |
+
return frame
|
137 |
|
138 |
def main():
|
139 |
+
# Configure Streamlit page
|
140 |
st.set_page_config(layout="wide")
|
141 |
+
st.title("📷 Real-time Posture Analysis")
|
142 |
|
143 |
+
# Create two-column layout
|
144 |
col1, col2 = st.columns([2, 1])
|
145 |
|
146 |
with col1:
|
147 |
+
st.header("Camera Feed")
|
148 |
|
149 |
+
# WebRTC streamer component
|
150 |
+
webrtc_ctx = webrtc_streamer(
|
151 |
+
key="posture-analysis",
|
152 |
+
mode=WebRtcMode.SENDRECV,
|
153 |
+
video_frame_callback=video_frame_callback,
|
154 |
+
media_stream_constraints={"video": True, "audio": False},
|
155 |
+
async_processing=True,
|
156 |
+
rtc_configuration={
|
157 |
+
"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]
|
158 |
+
}
|
159 |
+
)
|
160 |
+
|
161 |
+
# Handle camera state
|
162 |
+
if not webrtc_ctx.state.playing:
|
163 |
+
st.session_state.posture_status = "Camera is off - please enable access"
|
164 |
+
st.warning("Please allow camera permissions when prompted")
|
|
|
|
|
|
|
|
|
165 |
|
166 |
with col2:
|
167 |
+
st.header("Posture Analysis")
|
168 |
+
st.markdown(st.session_state.posture_status)
|
|
|
169 |
|
170 |
if __name__ == "__main__":
|
171 |
main()
|