Sean Carnahan
Update for HF Spaces deployment: Add memory management, error handling, and logging
cd361a4
raw
history blame
22.1 kB
from flask import Flask, render_template, request, jsonify, send_from_directory, url_for
from flask_cors import CORS
import cv2
import torch
import numpy as np
import os
from werkzeug.utils import secure_filename
import sys
import traceback
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image
import time
import tensorflow_hub as hub
import gc
import psutil
import logging
# Check GPU availability
print("[GPU] Checking GPU availability...")
gpus = tf.config.list_physical_devices('GPU')
if gpus:
print(f"[GPU] Found {len(gpus)} GPU(s):")
for gpu in gpus:
print(f"[GPU] {gpu}")
# Enable memory growth to avoid allocating all GPU memory at once
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
print("[GPU] Memory growth enabled for all GPUs")
else:
print("[GPU] No GPU found, will use CPU")
# Add bodybuilding_pose_analyzer to path
sys.path.append('.') # Assuming app.py is at the root of cv.github.io
from bodybuilding_pose_analyzer.src.movenet_analyzer import MoveNetAnalyzer
from bodybuilding_pose_analyzer.src.pose_analyzer import PoseAnalyzer
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def log_memory_usage():
"""Log current memory usage."""
process = psutil.Process()
memory_info = process.memory_info()
logger.info(f"Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB")
def cleanup_memory():
"""Force garbage collection and log memory usage."""
gc.collect()
log_memory_usage()
def wrap_text(text: str, font_face: int, font_scale: float, thickness: int, max_width: int) -> list[str]:
"""Wrap text to fit within max_width."""
if not text:
return []
lines = []
words = text.split(' ')
current_line = ''
for word in words:
# Check width if current_line + word fits
test_line = current_line + word + ' '
(text_width, _), _ = cv2.getTextSize(test_line.strip(), font_face, font_scale, thickness)
if text_width <= max_width:
current_line = test_line
else:
# Word doesn't fit, so current_line (without the new word) is a complete line
lines.append(current_line.strip())
# Start new line with the current word
current_line = word + ' '
# If a single word is too long, it will still overflow. Handle by breaking word if necessary (future enhancement)
(single_word_width, _), _ = cv2.getTextSize(word.strip(), font_face, font_scale, thickness)
if single_word_width > max_width:
# For now, just add the long word and let it overflow, or truncate it.
# A more complex solution would break the word.
lines.append(word.strip()) # Add the long word as its own line
current_line = '' # Reset current_line as the long word is handled
if current_line.strip(): # Add the last line
lines.append(current_line.strip())
return lines if lines else [text] # Ensure at least the original text is returned if no wrapping happens
app = Flask(__name__, static_url_path='/static', static_folder='static')
CORS(app, resources={r"/*": {"origins": "*"}})
app.config['UPLOAD_FOLDER'] = 'static/uploads'
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max file size
# Ensure upload directory exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# Load CNN model for bodybuilding pose classification
cnn_model_path = 'external/BodybuildingPoseClassifier/bodybuilding_pose_classifier.h5'
cnn_model = load_model(cnn_model_path)
cnn_class_labels = ['side_chest', 'front_double_biceps', 'back_double_biceps', 'front_lat_spread', 'back_lat_spread']
def predict_pose_cnn(img_path):
try:
cleanup_memory() # Clean up before prediction
if gpus:
logger.info("[CNN_DEBUG] Using GPU for CNN prediction")
with tf.device('/GPU:0'):
img = image.load_img(img_path, target_size=(150, 150))
img_array = image.img_to_array(img)
img_array = np.expand_dims(img_array, axis=0) / 255.0
predictions = cnn_model.predict(img_array, verbose=0) # Disable progress bar
predicted_class = np.argmax(predictions, axis=1)
confidence = float(np.max(predictions))
else:
logger.info("[CNN_DEBUG] No GPU found, using CPU for CNN prediction")
with tf.device('/CPU:0'):
img = image.load_img(img_path, target_size=(150, 150))
img_array = image.img_to_array(img)
img_array = np.expand_dims(img_array, axis=0) / 255.0
predictions = cnn_model.predict(img_array, verbose=0) # Disable progress bar
predicted_class = np.argmax(predictions, axis=1)
confidence = float(np.max(predictions))
logger.info(f"[CNN_DEBUG] Prediction successful: {cnn_class_labels[predicted_class[0]]}")
return cnn_class_labels[predicted_class[0]], confidence
except Exception as e:
logger.error(f"[CNN_ERROR] Exception during CNN prediction: {e}")
traceback.print_exc()
raise
finally:
cleanup_memory() # Clean up after prediction
@app.route('/static/uploads/<path:filename>')
def serve_video(filename):
response = send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=False)
# Ensure correct content type, especially for Safari/iOS if issues arise
if filename.lower().endswith('.mp4'):
response.headers['Content-Type'] = 'video/mp4'
return response
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization,X-Requested-With,Accept')
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
return response
def process_video_movenet(video_path):
try:
print("[DEBUG] Starting MoveNet video processing")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError("Could not open video file")
# Get video properties
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"[DEBUG] Video properties - FPS: {fps}, Width: {width}, Height: {height}, Total Frames: {total_frames}")
# Initialize MoveNet model on GPU if available
print("[DEBUG] Initializing MoveNet model")
if gpus:
print("[DEBUG] Using GPU for MoveNet")
with tf.device('/GPU:0'):
movenet_model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4")
movenet = movenet_model.signatures['serving_default']
else:
print("[DEBUG] No GPU found, using CPU for MoveNet")
with tf.device('/CPU:0'):
movenet_model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4")
movenet = movenet_model.signatures['serving_default']
# Create output video writer
output_filename = f'output_movenet_lightning.mp4'
output_path = os.path.join(app.config['UPLOAD_FOLDER'], output_filename)
print(f"Output path: {output_path}")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if not out.isOpened():
raise ValueError(f"Failed to create output video writer at {output_path}")
frame_count = 0
processed_frames = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count % 10 != 0: # Process every 10th frame
continue
try:
# Resize and pad the image to keep aspect ratio
img = frame.copy()
img = tf.image.resize_with_pad(tf.expand_dims(img, axis=0), 192, 192)
img = tf.cast(img, dtype=tf.int32)
# Run inference on GPU if available
if gpus:
with tf.device('/GPU:0'):
results = movenet(img)
keypoints = results['output_0'].numpy()
else:
with tf.device('/CPU:0'):
results = movenet(img)
keypoints = results['output_0'].numpy()
# Process keypoints and draw on frame
y, x, c = frame.shape
shaped = np.squeeze(keypoints)
for kp in range(17):
ky, kx, kp_conf = shaped[kp]
if kp_conf > 0.3:
cx, cy = int(kx * x), int(ky * y)
cv2.circle(frame, (cx, cy), 6, (0, 255, 0), -1)
out.write(frame)
processed_frames += 1
except Exception as e:
print(f"[ERROR] Error processing frame {frame_count}: {str(e)}")
continue
cap.release()
out.release()
print(f"[DEBUG] Processed {processed_frames} frames out of {total_frames} total frames")
return output_filename
except Exception as e:
print(f"[ERROR] Error in process_video_movenet: {str(e)}")
traceback.print_exc()
raise
def process_video_mediapipe(video_path):
try:
cleanup_memory() # Clean up before processing
logger.info(f"[PROCESS_VIDEO_MEDIAPIPE] Called with video_path: {video_path}")
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path}")
analyzer = PoseAnalyzer()
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Failed to open video file: {video_path}")
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Add panel width to total width
panel_width = 300
total_width = width + panel_width
print(f"Processing video with MediaPipe: {width}x{height} @ {fps}fps")
output_filename = f'output_mediapipe.mp4'
output_path = os.path.join(app.config['UPLOAD_FOLDER'], output_filename)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (total_width, height))
if not out.isOpened():
raise ValueError(f"Failed to create output video writer at {output_path}")
frame_count = 0
current_pose = 'Uncertain' # Initial pose for MediaPipe
segment_length = 4 * fps if fps > 0 else 120
cnn_pose = None
last_valid_landmarks = None
analysis_results = {'error': 'Processing not started'} # Initialize analysis_results
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count % 30 == 0:
logger.info(f"Processing frame {frame_count}")
cleanup_memory() # Clean up periodically
# Process frame with MediaPipe
processed_frame, current_analysis_results, landmarks = analyzer.process_frame(frame, last_valid_landmarks=last_valid_landmarks)
analysis_results = current_analysis_results # Update with the latest analysis
if landmarks:
last_valid_landmarks = landmarks
# CNN prediction (every 4 seconds)
if (frame_count - 1) % segment_length == 0:
temp_img_path = f'temp_frame_for_cnn_{frame_count}.jpg' # Unique temp name
cv2.imwrite(temp_img_path, frame)
try:
cnn_pose_pred, cnn_conf = predict_pose_cnn(temp_img_path)
logger.info(f"[CNN] Frame {frame_count}: Pose: {cnn_pose_pred}, Conf: {cnn_conf:.2f}")
if cnn_conf >= 0.3:
current_pose = cnn_pose_pred # Update current_pose to be displayed
except Exception as e:
logger.error(f"[CNN] Error predicting pose on frame {frame_count}: {e}")
finally:
if os.path.exists(temp_img_path):
os.remove(temp_img_path)
# Create side panel
panel = np.zeros((height, panel_width, 3), dtype=np.uint8)
# --- Dynamic Text Parameter Calculations ---
current_font = cv2.FONT_HERSHEY_DUPLEX
# Base font scale and reference video height for scaling
base_font_scale_at_ref_height = 0.6
reference_height_for_font_scale = 640.0
# Calculate dynamic font_scale
font_scale = (height / reference_height_for_font_scale) * base_font_scale_at_ref_height
font_scale = max(0.4, min(font_scale, 1.2))
# Calculate dynamic thickness
thickness = 1 if font_scale < 0.7 else 2
# Calculate dynamic line_height
(_, text_actual_height), _ = cv2.getTextSize("Ag", current_font, font_scale, thickness)
line_spacing_factor = 1.8
line_height = int(text_actual_height * line_spacing_factor)
line_height = max(line_height, 15)
# Initial y_offset
y_offset_panel = max(line_height, 20)
cv2.putText(panel, "Model: Gladiator SupaDot", (10, y_offset_panel), current_font, font_scale, (0, 255, 255), thickness, lineType=cv2.LINE_AA)
y_offset_panel += line_height
if frame_count % 30 == 0:
logger.info(f"[MEDIAPIPE_PANEL] Frame {frame_count} - Current Pose for Panel: {current_pose}")
cv2.putText(panel, f"Pose: {current_pose}", (10, y_offset_panel), current_font, font_scale, (255, 0, 0), thickness, lineType=cv2.LINE_AA)
y_offset_panel += int(line_height * 1.5)
if 'error' not in analysis_results:
cv2.putText(panel, "Angles:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA)
y_offset_panel += line_height
for joint, angle in analysis_results.get('angles', {}).items():
text_to_display = f"{joint.capitalize()}: {angle:.1f} deg"
cv2.putText(panel, text_to_display, (20, y_offset_panel), current_font, font_scale, (0, 255, 0), thickness, lineType=cv2.LINE_AA)
y_offset_panel += line_height
if analysis_results.get('corrections'):
y_offset_panel += line_height
cv2.putText(panel, "Corrections:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA)
y_offset_panel += line_height
for correction in analysis_results.get('corrections', []):
cv2.putText(panel, f"β€’ {correction}", (20, y_offset_panel), current_font, font_scale, (0, 0, 255), thickness, lineType=cv2.LINE_AA)
y_offset_panel += line_height
if analysis_results.get('notes'):
y_offset_panel += line_height
cv2.putText(panel, "Notes:", (10, y_offset_panel), current_font, font_scale, (200, 200, 200), thickness, lineType=cv2.LINE_AA)
y_offset_panel += line_height
for note in analysis_results.get('notes', []):
cv2.putText(panel, f"β€’ {note}", (20, y_offset_panel), current_font, font_scale, (200, 200, 200), thickness, lineType=cv2.LINE_AA)
y_offset_panel += line_height
else:
cv2.putText(panel, "Error:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA)
y_offset_panel += line_height
cv2.putText(panel, analysis_results.get('error', 'Unknown error'), (20, y_offset_panel), current_font, font_scale, (0, 0, 255), thickness, lineType=cv2.LINE_AA)
combined_frame = np.hstack((processed_frame, panel))
out.write(combined_frame)
cap.release()
out.release()
cleanup_memory() # Clean up after processing
if frame_count == 0:
raise ValueError("No frames were processed from the video by MediaPipe")
logger.info(f"MediaPipe video processing completed. Processed {frame_count} frames. Output: {output_path}")
return url_for('serve_video', filename=output_filename, _external=False)
except Exception as e:
logger.error(f'Error in process_video_mediapipe: {e}')
traceback.print_exc()
raise
finally:
cleanup_memory() # Clean up in case of error
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
try:
cleanup_memory() # Clean up before processing
if 'video' not in request.files:
logger.error("[UPLOAD] No video file in request")
return jsonify({'error': 'No video file provided'}), 400
file = request.files['video']
if file.filename == '':
logger.error("[UPLOAD] Empty filename")
return jsonify({'error': 'No selected file'}), 400
if file:
allowed_extensions = {'mp4', 'avi', 'mov', 'mkv'}
if '.' not in file.filename or file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions:
logger.error(f"[UPLOAD] Invalid file format: {file.filename}")
return jsonify({'error': 'Invalid file format. Allowed formats: mp4, avi, mov, mkv'}), 400
# Ensure the filename is properly sanitized
filename = secure_filename(file.filename)
logger.info(f"[UPLOAD] Original filename: {file.filename}")
logger.info(f"[UPLOAD] Sanitized filename: {filename}")
# Create a unique filename to prevent conflicts
base, ext = os.path.splitext(filename)
unique_filename = f"{base}_{int(time.time())}{ext}"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
logger.info(f"[UPLOAD] Saving file to: {filepath}")
file.save(filepath)
if not os.path.exists(filepath):
logger.error(f"[UPLOAD] File not found after save: {filepath}")
return jsonify({'error': 'Failed to save uploaded file'}), 500
logger.info(f"[UPLOAD] File saved successfully. Size: {os.path.getsize(filepath)} bytes")
try:
model_choice = request.form.get('model_choice', 'Gladiator SupaDot')
logger.info(f"[UPLOAD] Processing with model: {model_choice}")
if model_choice == 'movenet':
movenet_variant = request.form.get('movenet_variant', 'lightning')
logger.info(f"[UPLOAD] Using MoveNet variant: {movenet_variant}")
output_path_url = process_video_movenet(filepath)
else:
output_path_url = process_video_mediapipe(filepath)
logger.info(f"[UPLOAD] Processing complete. Output URL: {output_path_url}")
if not os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], os.path.basename(output_path_url))):
logger.error(f"[UPLOAD] Output file not found: {output_path_url}")
return jsonify({'error': 'Output video file not found'}), 500
return jsonify({
'message': f'Video processed successfully with {model_choice}',
'output_path': output_path_url
})
except Exception as e:
logger.error(f"[UPLOAD] Error processing video: {str(e)}")
traceback.print_exc()
return jsonify({'error': f'Error processing video: {str(e)}'}), 500
finally:
try:
if os.path.exists(filepath):
os.remove(filepath)
logger.info(f"[UPLOAD] Cleaned up input file: {filepath}")
except Exception as e:
logger.error(f"[UPLOAD] Error cleaning up file: {str(e)}")
except Exception as e:
logger.error(f"[UPLOAD] Unexpected error: {str(e)}")
traceback.print_exc()
return jsonify({'error': 'Internal server error'}), 500
finally:
cleanup_memory() # Clean up after processing
# Add error handlers
@app.errorhandler(413)
def request_entity_too_large(error):
return jsonify({'error': 'File too large. Maximum size is 100MB'}), 413
@app.errorhandler(500)
def internal_server_error(error):
return jsonify({'error': 'Internal server error. Please try again later.'}), 500
@app.errorhandler(404)
def not_found_error(error):
return jsonify({'error': 'Resource not found'}), 404
if __name__ == '__main__':
# Ensure the port is 7860 and debug is False for HF Spaces deployment
app.run(host='0.0.0.0', port=7860, debug=False)