Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, jsonify, send_from_directory, url_for | |
from flask_cors import CORS | |
import cv2 | |
import torch | |
import numpy as np | |
import os | |
from werkzeug.utils import secure_filename | |
import sys | |
import traceback | |
import tensorflow as tf | |
from tensorflow.keras.models import load_model | |
from tensorflow.keras.preprocessing import image | |
import time | |
import tensorflow_hub as hub | |
# Check GPU availability | |
print("[GPU] Checking GPU availability...") | |
gpus = tf.config.list_physical_devices('GPU') | |
if gpus: | |
print(f"[GPU] Found {len(gpus)} GPU(s):") | |
for gpu in gpus: | |
print(f"[GPU] {gpu}") | |
# Enable memory growth to avoid allocating all GPU memory at once | |
for gpu in gpus: | |
tf.config.experimental.set_memory_growth(gpu, True) | |
print("[GPU] Memory growth enabled for all GPUs") | |
else: | |
print("[GPU] No GPU found, will use CPU") | |
# Add bodybuilding_pose_analyzer to path | |
sys.path.append('.') # Assuming app.py is at the root of cv.github.io | |
from bodybuilding_pose_analyzer.src.movenet_analyzer import MoveNetAnalyzer | |
from bodybuilding_pose_analyzer.src.pose_analyzer import PoseAnalyzer | |
# Add YOLOv7 to path | |
# sys.path.append('yolov7') | |
# from yolov7.models.experimental import attempt_load | |
# from yolov7.utils.general import check_img_size, non_max_suppression_kpt, scale_coords | |
# from yolov7.utils.torch_utils import select_device | |
# from yolov7.utils.plots import plot_skeleton_kpts | |
def wrap_text(text: str, font_face: int, font_scale: float, thickness: int, max_width: int) -> list[str]: | |
"""Wrap text to fit within max_width.""" | |
if not text: | |
return [] | |
lines = [] | |
words = text.split(' ') | |
current_line = '' | |
for word in words: | |
# Check width if current_line + word fits | |
test_line = current_line + word + ' ' | |
(text_width, _), _ = cv2.getTextSize(test_line.strip(), font_face, font_scale, thickness) | |
if text_width <= max_width: | |
current_line = test_line | |
else: | |
# Word doesn't fit, so current_line (without the new word) is a complete line | |
lines.append(current_line.strip()) | |
# Start new line with the current word | |
current_line = word + ' ' | |
# If a single word is too long, it will still overflow. Handle by breaking word if necessary (future enhancement) | |
(single_word_width, _), _ = cv2.getTextSize(word.strip(), font_face, font_scale, thickness) | |
if single_word_width > max_width: | |
# For now, just add the long word and let it overflow, or truncate it. | |
# A more complex solution would break the word. | |
lines.append(word.strip()) # Add the long word as its own line | |
current_line = '' # Reset current_line as the long word is handled | |
if current_line.strip(): # Add the last line | |
lines.append(current_line.strip()) | |
return lines if lines else [text] # Ensure at least the original text is returned if no wrapping happens | |
app = Flask(__name__, static_url_path='/static', static_folder='static') | |
CORS(app, resources={r"/*": {"origins": "*"}}) | |
app.config['UPLOAD_FOLDER'] = 'static/uploads' | |
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max file size | |
# Ensure upload directory exists | |
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
# Initialize YOLOv7 model | |
# device = select_device('') | |
# yolo_model = None # Initialize as None | |
# stride = None | |
# imgsz = None | |
# try: | |
# yolo_model = attempt_load('yolov7-w6-pose.pt', map_location=device) | |
# stride = int(yolo_model.stride.max()) | |
# imgsz = check_img_size(640, s=stride) | |
# print("YOLOv7 Model loaded successfully") | |
# except Exception as e: | |
# print(f"Error loading YOLOv7 model: {e}") | |
# traceback.print_exc() | |
# Not raising here to allow app to run if only MoveNet is used. Error will be caught if YOLOv7 is selected. | |
# YOLOv7 pose model expects 17 keypoints | |
# kpt_shape = (17, 3) | |
# Load CNN model for bodybuilding pose classification | |
cnn_model_path = 'external/BodybuildingPoseClassifier/bodybuilding_pose_classifier.h5' | |
cnn_model = load_model(cnn_model_path) | |
cnn_class_labels = ['side_chest', 'front_double_biceps', 'back_double_biceps', 'front_lat_spread', 'back_lat_spread'] | |
def predict_pose_cnn(img_path): | |
try: | |
if gpus: | |
print("[CNN_DEBUG] Using GPU for CNN prediction") | |
with tf.device('/GPU:0'): | |
img = image.load_img(img_path, target_size=(150, 150)) | |
img_array = image.img_to_array(img) | |
img_array = np.expand_dims(img_array, axis=0) / 255.0 | |
predictions = cnn_model.predict(img_array) | |
predicted_class = np.argmax(predictions, axis=1) | |
confidence = float(np.max(predictions)) | |
else: | |
print("[CNN_DEBUG] No GPU found, using CPU for CNN prediction") | |
with tf.device('/CPU:0'): | |
img = image.load_img(img_path, target_size=(150, 150)) | |
img_array = image.img_to_array(img) | |
img_array = np.expand_dims(img_array, axis=0) / 255.0 | |
predictions = cnn_model.predict(img_array) | |
predicted_class = np.argmax(predictions, axis=1) | |
confidence = float(np.max(predictions)) | |
print(f"[CNN_DEBUG] Prediction successful: {cnn_class_labels[predicted_class[0]]}") | |
return cnn_class_labels[predicted_class[0]], confidence | |
except Exception as e: | |
print(f"[CNN_ERROR] Exception during CNN prediction: {e}") | |
traceback.print_exc() | |
raise | |
def serve_video(filename): | |
response = send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=False) | |
# Ensure correct content type, especially for Safari/iOS if issues arise | |
if filename.lower().endswith('.mp4'): | |
response.headers['Content-Type'] = 'video/mp4' | |
return response | |
def after_request(response): | |
response.headers.add('Access-Control-Allow-Origin', '*') | |
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization,X-Requested-With,Accept') | |
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS') | |
return response | |
# def process_video_yolov7(video_path): # Renamed from process_video | |
# global yolo_model, imgsz, stride # Ensure global model is used | |
# if yolo_model is None: | |
# raise RuntimeError("YOLOv7 model failed to load. Cannot process video.") | |
# try: | |
# if not os.path.exists(video_path): | |
# raise FileNotFoundError(f"Video file not found: {video_path}") | |
# | |
# cap = cv2.VideoCapture(video_path) | |
# if not cap.isOpened(): | |
# raise ValueError(f"Failed to open video file: {video_path}") | |
# | |
# fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
# | |
# print(f"Processing video: {width}x{height} @ {fps}fps") | |
# | |
# # Create output video writer | |
# output_path = os.path.join(app.config['UPLOAD_FOLDER'], 'output.mp4') | |
# fourcc = cv2.VideoWriter_fourcc(*'avc1') | |
# out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
# | |
# frame_count = 0 | |
# while cap.isOpened(): | |
# ret, frame = cap.read() | |
# if not ret: | |
# break | |
# | |
# frame_count += 1 | |
# print(f"Processing frame {frame_count}") | |
# | |
# # Prepare image | |
# img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# img = cv2.resize(img, (imgsz, imgsz)) | |
# img = img.transpose((2, 0, 1)) # HWC to CHW | |
# img = np.ascontiguousarray(img) | |
# img = torch.from_numpy(img).to(device) | |
# img = img.float() / 255.0 | |
# if img.ndimension() == 3: | |
# img = img.unsqueeze(0) | |
# | |
# # Inference | |
# with torch.no_grad(): | |
# pred = yolo_model(img)[0] # Use yolo_model | |
# pred = non_max_suppression_kpt(pred, conf_thres=0.25, iou_thres=0.45, nc=yolo_model.yaml['nc'], kpt_label=True) | |
# | |
# # Draw results | |
# output_frame = frame.copy() | |
# poses_detected = False | |
# for det in pred: | |
# if len(det): | |
# poses_detected = True | |
# det[:, :4] = scale_coords(img.shape[2:], det[:, :4], frame.shape).round() | |
# for row in det: | |
# xyxy = row[:4] | |
# conf = row[4] | |
# cls = row[5] | |
# kpts = row[6:] | |
# kpts = torch.tensor(kpts).view(kpt_shape) | |
# output_frame = plot_skeleton_kpts(output_frame, kpts, steps=3, orig_shape=output_frame.shape[:2]) | |
# | |
# if not poses_detected: | |
# print(f"No poses detected in frame {frame_count}") | |
# | |
# out.write(output_frame) | |
# | |
# cap.release() | |
# out.release() | |
# | |
# if frame_count == 0: | |
# raise ValueError("No frames were processed from the video") | |
# | |
# print(f"Video processing completed. Processed {frame_count} frames") | |
# # Return URL for the client, using the 'serve_video' endpoint | |
# output_filename = 'output.mp4' | |
# return url_for('serve_video', filename=output_filename, _external=False) | |
# except Exception as e: | |
# print('Error in process_video:', e) | |
# traceback.print_exc() | |
# raise | |
def process_video_movenet(video_path): | |
try: | |
print("[DEBUG] Starting MoveNet video processing") | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise ValueError("Could not open video file") | |
# Get video properties | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
print(f"[DEBUG] Video properties - FPS: {fps}, Width: {width}, Height: {height}, Total Frames: {total_frames}") | |
# Initialize MoveNet model on GPU if available | |
print("[DEBUG] Initializing MoveNet model") | |
if gpus: | |
print("[DEBUG] Using GPU for MoveNet") | |
with tf.device('/GPU:0'): | |
movenet_model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4") | |
movenet = movenet_model.signatures['serving_default'] | |
else: | |
print("[DEBUG] No GPU found, using CPU for MoveNet") | |
with tf.device('/CPU:0'): | |
movenet_model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4") | |
movenet = movenet_model.signatures['serving_default'] | |
# Create output video writer | |
output_filename = f'output_movenet_lightning.mp4' | |
output_path = os.path.join(app.config['UPLOAD_FOLDER'], output_filename) | |
print(f"Output path: {output_path}") | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
if not out.isOpened(): | |
raise ValueError(f"Failed to create output video writer at {output_path}") | |
frame_count = 0 | |
processed_frames = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_count += 1 | |
if frame_count % 10 != 0: # Process every 10th frame | |
continue | |
try: | |
# Resize and pad the image to keep aspect ratio | |
img = frame.copy() | |
img = tf.image.resize_with_pad(tf.expand_dims(img, axis=0), 192, 192) | |
img = tf.cast(img, dtype=tf.int32) | |
# Run inference on GPU if available | |
if gpus: | |
with tf.device('/GPU:0'): | |
results = movenet(img) | |
keypoints = results['output_0'].numpy() | |
else: | |
with tf.device('/CPU:0'): | |
results = movenet(img) | |
keypoints = results['output_0'].numpy() | |
# Process keypoints and draw on frame | |
y, x, c = frame.shape | |
shaped = np.squeeze(keypoints) | |
for kp in range(17): | |
ky, kx, kp_conf = shaped[kp] | |
if kp_conf > 0.3: | |
cx, cy = int(kx * x), int(ky * y) | |
cv2.circle(frame, (cx, cy), 6, (0, 255, 0), -1) | |
out.write(frame) | |
processed_frames += 1 | |
except Exception as e: | |
print(f"[ERROR] Error processing frame {frame_count}: {str(e)}") | |
continue | |
cap.release() | |
out.release() | |
print(f"[DEBUG] Processed {processed_frames} frames out of {total_frames} total frames") | |
return output_filename | |
except Exception as e: | |
print(f"[ERROR] Error in process_video_movenet: {str(e)}") | |
traceback.print_exc() | |
raise | |
def process_video_mediapipe(video_path): | |
try: | |
print(f"[PROCESS_VIDEO_MEDIAPIPE] Called with video_path: {video_path}") | |
if not os.path.exists(video_path): | |
raise FileNotFoundError(f"Video file not found: {video_path}") | |
analyzer = PoseAnalyzer() | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise ValueError(f"Failed to open video file: {video_path}") | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
# Add panel width to total width | |
panel_width = 300 | |
total_width = width + panel_width | |
print(f"Processing video with MediaPipe: {width}x{height} @ {fps}fps") | |
output_filename = f'output_mediapipe.mp4' | |
output_path = os.path.join(app.config['UPLOAD_FOLDER'], output_filename) | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(output_path, fourcc, fps, (total_width, height)) | |
if not out.isOpened(): | |
raise ValueError(f"Failed to create output video writer at {output_path}") | |
frame_count = 0 | |
current_pose = 'Uncertain' # Initial pose for MediaPipe | |
segment_length = 4 * fps if fps > 0 else 120 | |
cnn_pose = None | |
last_valid_landmarks = None | |
analysis_results = {'error': 'Processing not started'} # Initialize analysis_results | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_count += 1 | |
if frame_count % 30 == 0: | |
print(f"Processing frame {frame_count}") | |
# Process frame with MediaPipe | |
processed_frame, current_analysis_results, landmarks = analyzer.process_frame(frame, last_valid_landmarks=last_valid_landmarks) | |
analysis_results = current_analysis_results # Update with the latest analysis | |
if landmarks: | |
last_valid_landmarks = landmarks | |
# CNN prediction (every 4 seconds) | |
if (frame_count - 1) % segment_length == 0: | |
temp_img_path = f'temp_frame_for_cnn_{frame_count}.jpg' # Unique temp name | |
cv2.imwrite(temp_img_path, frame) | |
try: | |
cnn_pose_pred, cnn_conf = predict_pose_cnn(temp_img_path) | |
print(f"[CNN] Frame {frame_count}: Pose: {cnn_pose_pred}, Conf: {cnn_conf:.2f}") | |
if cnn_conf >= 0.3: | |
current_pose = cnn_pose_pred # Update current_pose to be displayed | |
except Exception as e: | |
print(f"[CNN] Error predicting pose on frame {frame_count}: {e}") | |
finally: | |
if os.path.exists(temp_img_path): | |
os.remove(temp_img_path) | |
# Create side panel | |
panel = np.zeros((height, panel_width, 3), dtype=np.uint8) | |
# --- Dynamic Text Parameter Calculations --- | |
current_font = cv2.FONT_HERSHEY_DUPLEX | |
# Base font scale and reference video height for scaling | |
# Adjust base_font_scale_at_ref_height if text is generally too large or too small | |
base_font_scale_at_ref_height = 0.6 | |
reference_height_for_font_scale = 640.0 # e.g., a common video height like 480p, 720p | |
# Calculate dynamic font_scale | |
font_scale = (height / reference_height_for_font_scale) * base_font_scale_at_ref_height | |
# Clamp font_scale to a min/max range to avoid extremes | |
font_scale = max(0.4, min(font_scale, 1.2)) | |
# Calculate dynamic thickness | |
thickness = 1 if font_scale < 0.7 else 2 | |
# Calculate dynamic line_height based on actual text height | |
# Using a sample string like "Ag" which has ascenders and descenders | |
(_, text_actual_height), _ = cv2.getTextSize("Ag", current_font, font_scale, thickness) | |
line_spacing_factor = 1.8 # Adjust for more or less space between lines | |
line_height = int(text_actual_height * line_spacing_factor) | |
line_height = max(line_height, 15) # Ensure a minimum line height | |
# Initial y_offset for the first line of text | |
y_offset_panel = max(line_height, 20) # Start considering top margin and text height | |
# --- End of Dynamic Text Parameter Calculations --- | |
cv2.putText(panel, "Model: Gladiator SupaDot", (10, y_offset_panel), current_font, font_scale, (0, 255, 255), thickness, lineType=cv2.LINE_AA) | |
y_offset_panel += line_height | |
if frame_count % 30 == 0: # Print every 30 frames to avoid flooding console | |
print(f"[MEDIAPIPE_PANEL] Frame {frame_count} - Current Pose for Panel: {current_pose}") | |
cv2.putText(panel, f"Pose: {current_pose}", (10, y_offset_panel), current_font, font_scale, (255, 0, 0), thickness, lineType=cv2.LINE_AA) | |
y_offset_panel += int(line_height * 1.5) | |
if 'error' not in analysis_results: | |
cv2.putText(panel, "Angles:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) | |
y_offset_panel += line_height | |
for joint, angle in analysis_results.get('angles', {}).items(): | |
text_to_display = f"{joint.capitalize()}: {angle:.1f} deg" | |
cv2.putText(panel, text_to_display, (20, y_offset_panel), current_font, font_scale, (0, 255, 0), thickness, lineType=cv2.LINE_AA) | |
y_offset_panel += line_height | |
if analysis_results.get('corrections'): | |
y_offset_panel += line_height | |
cv2.putText(panel, "Corrections:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) | |
y_offset_panel += line_height | |
for correction in analysis_results.get('corrections', []): | |
cv2.putText(panel, f"β’ {correction}", (20, y_offset_panel), current_font, font_scale, (0, 0, 255), thickness, lineType=cv2.LINE_AA) | |
y_offset_panel += line_height | |
# Display notes if any | |
if analysis_results.get('notes'): | |
y_offset_panel += line_height | |
cv2.putText(panel, "Notes:", (10, y_offset_panel), current_font, font_scale, (200, 200, 200), thickness, lineType=cv2.LINE_AA) # Grey color for notes | |
y_offset_panel += line_height | |
for note in analysis_results.get('notes', []): | |
cv2.putText(panel, f"β’ {note}", (20, y_offset_panel), current_font, font_scale, (200, 200, 200), thickness, lineType=cv2.LINE_AA) | |
y_offset_panel += line_height | |
else: | |
cv2.putText(panel, "Error:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) | |
y_offset_panel += line_height | |
cv2.putText(panel, analysis_results.get('error', 'Unknown error'), (20, y_offset_panel), current_font, font_scale, (0, 0, 255), thickness, lineType=cv2.LINE_AA) | |
combined_frame = np.hstack((processed_frame, panel)) # Use processed_frame from analyzer | |
out.write(combined_frame) | |
cap.release() | |
out.release() | |
if frame_count == 0: | |
raise ValueError("No frames were processed from the video by MediaPipe") | |
print(f"MediaPipe video processing completed. Processed {frame_count} frames. Output: {output_path}") | |
return url_for('serve_video', filename=output_filename, _external=False) | |
except Exception as e: | |
print(f'Error in process_video_mediapipe: {e}') | |
traceback.print_exc() | |
raise | |
def index(): | |
return render_template('index.html') | |
def upload_file(): | |
try: | |
if 'video' not in request.files: | |
print("[UPLOAD] No video file in request") | |
return jsonify({'error': 'No video file provided'}), 400 | |
file = request.files['video'] | |
if file.filename == '': | |
print("[UPLOAD] Empty filename") | |
return jsonify({'error': 'No selected file'}), 400 | |
if file: | |
allowed_extensions = {'mp4', 'avi', 'mov', 'mkv'} | |
if '.' not in file.filename or file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions: | |
print(f"[UPLOAD] Invalid file format: {file.filename}") | |
return jsonify({'error': 'Invalid file format. Allowed formats: mp4, avi, mov, mkv'}), 400 | |
# Ensure the filename is properly sanitized | |
filename = secure_filename(file.filename) | |
print(f"[UPLOAD] Original filename: {file.filename}") | |
print(f"[UPLOAD] Sanitized filename: {filename}") | |
# Create a unique filename to prevent conflicts | |
base, ext = os.path.splitext(filename) | |
unique_filename = f"{base}_{int(time.time())}{ext}" | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) | |
print(f"[UPLOAD] Saving file to: {filepath}") | |
file.save(filepath) | |
if not os.path.exists(filepath): | |
print(f"[UPLOAD] File not found after save: {filepath}") | |
return jsonify({'error': 'Failed to save uploaded file'}), 500 | |
print(f"[UPLOAD] File saved successfully. Size: {os.path.getsize(filepath)} bytes") | |
try: | |
model_choice = request.form.get('model_choice', 'Gladiator SupaDot') | |
print(f"[UPLOAD] Processing with model: {model_choice}") | |
if model_choice == 'movenet': | |
movenet_variant = request.form.get('movenet_variant', 'lightning') | |
print(f"[UPLOAD] Using MoveNet variant: {movenet_variant}") | |
output_path_url = process_video_movenet(filepath) | |
else: | |
output_path_url = process_video_mediapipe(filepath) | |
print(f"[UPLOAD] Processing complete. Output URL: {output_path_url}") | |
if not os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], os.path.basename(output_path_url))): | |
print(f"[UPLOAD] Output file not found: {output_path_url}") | |
return jsonify({'error': 'Output video file not found'}), 500 | |
return jsonify({ | |
'message': f'Video processed successfully with {model_choice}', | |
'output_path': output_path_url | |
}) | |
except Exception as e: | |
print(f"[UPLOAD] Error processing video: {str(e)}") | |
traceback.print_exc() | |
return jsonify({'error': f'Error processing video: {str(e)}'}), 500 | |
finally: | |
try: | |
if os.path.exists(filepath): | |
os.remove(filepath) | |
print(f"[UPLOAD] Cleaned up input file: {filepath}") | |
except Exception as e: | |
print(f"[UPLOAD] Error cleaning up file: {str(e)}") | |
except Exception as e: | |
print(f"[UPLOAD] Unexpected error: {str(e)}") | |
traceback.print_exc() | |
return jsonify({'error': 'Internal server error'}), 500 | |
if __name__ == '__main__': | |
# Ensure the port is 7860 and debug is False for HF Spaces deployment | |
app.run(host='0.0.0.0', port=7860, debug=False) |