# Patch for Hugging Face Spaces: set MPLCONFIGDIR to avoid permission errors with matplotlib import os os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" os.makedirs("/tmp/matplotlib", exist_ok=True) from flask import Flask, render_template, request, jsonify, send_from_directory, url_for from flask_cors import CORS import cv2 import torch import numpy as np import os from werkzeug.utils import secure_filename import sys import traceback import tensorflow as tf from tensorflow.keras.models import load_model from tensorflow.keras.preprocessing import image import time import tensorflow_hub as hub import gc import psutil import logging # Check GPU availability print("[GPU] Checking GPU availability...") gpus = tf.config.list_physical_devices('GPU') if gpus: print(f"[GPU] Found {len(gpus)} GPU(s):") for gpu in gpus: print(f"[GPU] {gpu}") # Enable memory growth to avoid allocating all GPU memory at once for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) print("[GPU] Memory growth enabled for all GPUs") else: print("[GPU] No GPU found, will use CPU") # Add bodybuilding_pose_analyzer to path sys.path.append('.') # Assuming app.py is at the root of cv.github.io from bodybuilding_pose_analyzer.src.movenet_analyzer import MoveNetAnalyzer from bodybuilding_pose_analyzer.src.pose_analyzer import PoseAnalyzer # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def log_memory_usage(): """Log current memory usage.""" try: process = psutil.Process() memory_info = process.memory_info() logger.info(f"Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB") except Exception as e: logger.error(f"Error logging memory usage: {e}") def cleanup_memory(): """Force garbage collection and log memory usage.""" try: gc.collect() log_memory_usage() except Exception as e: logger.error(f"Error in cleanup_memory: {e}") # Add file handler for persistent logging log_dir = 'logs' os.makedirs(log_dir, exist_ok=True) file_handler = logging.FileHandler(os.path.join(log_dir, 'app.log')) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(file_handler) # Define base paths BASE_DIR = os.path.dirname(os.path.abspath(__file__)) STATIC_DIR = os.path.join(BASE_DIR, 'static') UPLOAD_DIR = os.path.join(STATIC_DIR, 'uploads') MODEL_DIR = os.path.join(BASE_DIR, 'external', 'BodybuildingPoseClassifier') # Ensure all required directories exist for directory in [STATIC_DIR, UPLOAD_DIR, MODEL_DIR, log_dir]: os.makedirs(directory, exist_ok=True) logger.info(f"Ensured directory exists: {directory}") app = Flask(__name__, static_url_path='/static', static_folder=STATIC_DIR) CORS(app, resources={r"/*": {"origins": "*"}}) app.config['UPLOAD_FOLDER'] = UPLOAD_DIR app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max file size # Load CNN model for bodybuilding pose classification try: logger.info("Loading CNN model...") cnn_model_path = os.path.join(MODEL_DIR, 'bodybuilding_pose_classifier.h5') logger.info(f"Looking for model at: {cnn_model_path}") # List directory contents to debug logger.info(f"Contents of MODEL_DIR: {os.listdir(MODEL_DIR)}") if not os.path.exists(cnn_model_path): logger.error(f"Model file not found at {cnn_model_path}") logger.error(f"Current working directory: {os.getcwd()}") logger.error(f"Directory contents: {os.listdir('.')}") raise FileNotFoundError(f"CNN model not found at {cnn_model_path}") # Check file permissions logger.info(f"Model file permissions: {oct(os.stat(cnn_model_path).st_mode)[-3:]}") # Load model with custom_objects to handle any custom layers logger.info("Attempting to load model...") cnn_model = load_model(cnn_model_path, compile=False) logger.info("CNN model loaded successfully") except Exception as e: logger.error(f"Error loading CNN model: {e}") logger.error(traceback.format_exc()) raise # Initialize TensorFlow session with memory growth try: gpus = tf.config.list_physical_devices('GPU') if gpus: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) logger.info("GPU memory growth enabled") else: logger.info("No GPU found, using CPU") except Exception as e: logger.error(f"Error configuring GPU: {e}") logger.error(traceback.format_exc()) cnn_class_labels = ['side_chest', 'front_double_biceps', 'back_double_biceps', 'front_lat_spread', 'back_lat_spread'] def predict_pose_cnn(img_path): try: cleanup_memory() if gpus: logger.info("[CNN_DEBUG] Using GPU for CNN prediction") with tf.device('/GPU:0'): img = image.load_img(img_path, target_size=(150, 150)) img_array = image.img_to_array(img) img_array = np.expand_dims(img_array, axis=0) / 255.0 predictions = cnn_model.predict(img_array, verbose=0) predicted_class = np.argmax(predictions, axis=1) confidence = float(np.max(predictions)) else: logger.info("[CNN_DEBUG] No GPU found, using CPU for CNN prediction") with tf.device('/CPU:0'): img = image.load_img(img_path, target_size=(150, 150)) img_array = image.img_to_array(img) img_array = np.expand_dims(img_array, axis=0) / 255.0 predictions = cnn_model.predict(img_array, verbose=0) predicted_class = np.argmax(predictions, axis=1) confidence = float(np.max(predictions)) logger.info(f"[CNN_DEBUG] Prediction successful: {cnn_class_labels[predicted_class[0]]}") return cnn_class_labels[predicted_class[0]], confidence except Exception as e: logger.error(f"[CNN_ERROR] Exception during CNN prediction: {e}") logger.error(traceback.format_exc()) raise finally: cleanup_memory() @app.route('/static/uploads/', endpoint='serve_video') def serve_video(filename): response = send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=False) # Ensure correct content type, especially for Safari/iOS if issues arise if filename.lower().endswith('.mp4'): response.headers['Content-Type'] = 'video/mp4' return response @app.after_request def after_request(response): response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization,X-Requested-With,Accept') response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS') return response def process_video_movenet(video_path): try: print("[DEBUG] Starting MoveNet video processing") print(f"[DEBUG] Python version: {sys.version}") print(f"[DEBUG] OpenCV version: {cv2.__version__}") print(f"[DEBUG] TensorFlow version: {tf.__version__}") print(f"[DEBUG] Upload dir contents: {os.listdir(os.path.dirname(video_path))}") print(f"[DEBUG] Current working dir: {os.getcwd()}") print(f"[DEBUG] Model dir contents: {os.listdir(os.path.join(BASE_DIR, 'external', 'BodybuildingPoseClassifier'))}") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"[ERROR] Could not open video file: {video_path}") raise ValueError("Could not open video file") # Get video properties fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"[DEBUG] Video properties - FPS: {fps}, Width: {width}, Height: {height}, Total Frames: {total_frames}") # Force MoveNet to CPU to avoid GPU JIT error print("[DEBUG] Forcing CPU for MoveNet (due to GPU JIT error)") try: with tf.device('/CPU:0'): print("[DEBUG] Loading MoveNet model...") movenet_model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4") movenet = movenet_model.signatures['serving_default'] print("[DEBUG] MoveNet model loaded.") except Exception as e: print(f"[ERROR] Exception during MoveNet model load: {e}") import traceback; traceback.print_exc() raise # Create output video writer output_filename = f'output_movenet_lightning.mp4' output_path = os.path.join(app.config['UPLOAD_FOLDER'], output_filename) print(f"Output path: {output_path}") fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) if not out.isOpened(): print(f"[ERROR] Failed to create output video writer at {output_path}") raise ValueError(f"Failed to create output video writer at {output_path}") frame_count = 0 processed_frames = 0 first_frame_shape = None print("[DEBUG] Entering frame loop...") while cap.isOpened(): try: ret, frame = cap.read() print(f"[DEBUG] Frame {frame_count+1}: ret={ret}, frame is None: {frame is None}") if not ret or frame is None: print(f"[DEBUG] Stopping at frame {frame_count+1}: ret={ret}, frame is None: {frame is None}") break if first_frame_shape is None: first_frame_shape = frame.shape print(f"[DEBUG] First frame shape: {first_frame_shape}") frame_count += 1 # Ensure frame size matches VideoWriter if frame.shape[1] != width or frame.shape[0] != height: print(f"[WARNING] Frame size {frame.shape[1]}x{frame.shape[0]} does not match VideoWriter size {width}x{height}. Resizing.") frame = cv2.resize(frame, (width, height)) # Resize and pad the image to keep aspect ratio img = frame.copy() img = tf.image.resize_with_pad(tf.expand_dims(img, axis=0), 192, 192) img = tf.cast(img, dtype=tf.int32) # Always run inference on CPU try: with tf.device('/CPU:0'): results = movenet(img) keypoints = results['output_0'].numpy() except Exception as e: print(f"[ERROR] Exception during MoveNet inference on frame {frame_count}: {e}") import traceback; traceback.print_exc() continue # Process keypoints and draw on frame y, x, c = frame.shape shaped = np.squeeze(keypoints) for kp in range(17): ky, kx, kp_conf = shaped[kp] if kp_conf > 0.3: cx, cy = int(kx * x), int(ky * y) cv2.circle(frame, (cx, cy), 6, (0, 255, 0), -1) out.write(frame) processed_frames += 1 print(f"[DEBUG] Wrote frame {frame_count} to output video.") except Exception as e: print(f"[ERROR] Exception in frame loop at frame {frame_count+1}: {e}") import traceback; traceback.print_exc() continue cap.release() out.release() print(f"[DEBUG] Processed {processed_frames} frames out of {total_frames} total frames") # Check output file size if not os.path.exists(output_path): print(f"[ERROR] Output video file was not created: {output_path}") raise ValueError(f"Output video file was not created: {output_path}") file_size = os.path.getsize(output_path) print(f"[DEBUG] Output video file size: {file_size} bytes") if processed_frames == 0 or file_size < 1000: print(f"[ERROR] Output video file is empty or too small: {output_path}") raise ValueError(f"Output video file is empty or too small: {output_path}") video_url = url_for('serve_video', filename=output_filename, _external=False) print(f"[DEBUG] Returning video URL: {video_url}") return video_url except Exception as e: print(f"[FATAL ERROR] Uncaught exception in process_video_movenet: {e}") import traceback; traceback.print_exc() raise def process_video_mediapipe(video_path): try: cleanup_memory() # Clean up before processing logger.info(f"[PROCESS_VIDEO_MEDIAPIPE] Called with video_path: {video_path}") if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") analyzer = PoseAnalyzer() cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Failed to open video file: {video_path}") fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Add panel width to total width panel_width = 300 total_width = width + panel_width print(f"Processing video with MediaPipe: {width}x{height} @ {fps}fps") output_filename = f'output_mediapipe.mp4' output_path = os.path.join(app.config['UPLOAD_FOLDER'], output_filename) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (total_width, height)) if not out.isOpened(): raise ValueError(f"Failed to create output video writer at {output_path}") frame_count = 0 current_pose = 'Uncertain' # Initial pose for MediaPipe segment_length = 4 * fps if fps > 0 else 120 cnn_pose = None last_valid_landmarks = None analysis_results = {'error': 'Processing not started'} # Initialize analysis_results while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % 30 == 0: logger.info(f"Processing frame {frame_count}") cleanup_memory() # Clean up periodically # Process frame with MediaPipe processed_frame, current_analysis_results, landmarks = analyzer.process_frame(frame, last_valid_landmarks=last_valid_landmarks) analysis_results = current_analysis_results # Update with the latest analysis if landmarks: last_valid_landmarks = landmarks # CNN prediction (every 4 seconds) if (frame_count - 1) % segment_length == 0: temp_img_path = f'temp_frame_for_cnn_{frame_count}.jpg' # Unique temp name cv2.imwrite(temp_img_path, frame) try: cnn_pose_pred, cnn_conf = predict_pose_cnn(temp_img_path) logger.info(f"[CNN] Frame {frame_count}: Pose: {cnn_pose_pred}, Conf: {cnn_conf:.2f}") if cnn_conf >= 0.3: current_pose = cnn_pose_pred # Update current_pose to be displayed except Exception as e: logger.error(f"[CNN] Error predicting pose on frame {frame_count}: {e}") finally: if os.path.exists(temp_img_path): os.remove(temp_img_path) # Create side panel panel = np.zeros((height, panel_width, 3), dtype=np.uint8) # --- Dynamic Text Parameter Calculations --- current_font = cv2.FONT_HERSHEY_DUPLEX # Base font scale and reference video height for scaling base_font_scale_at_ref_height = 0.6 reference_height_for_font_scale = 640.0 # Calculate dynamic font_scale font_scale = (height / reference_height_for_font_scale) * base_font_scale_at_ref_height font_scale = max(0.4, min(font_scale, 1.2)) # Calculate dynamic thickness thickness = 1 if font_scale < 0.7 else 2 # Calculate dynamic line_height (_, text_actual_height), _ = cv2.getTextSize("Ag", current_font, font_scale, thickness) line_spacing_factor = 1.8 line_height = int(text_actual_height * line_spacing_factor) line_height = max(line_height, 15) # Initial y_offset y_offset_panel = max(line_height, 20) cv2.putText(panel, "Model: Gladiator SupaDot", (10, y_offset_panel), current_font, font_scale, (0, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height if frame_count % 30 == 0: logger.info(f"[MEDIAPIPE_PANEL] Frame {frame_count} - Current Pose for Panel: {current_pose}") cv2.putText(panel, f"Pose: {current_pose}", (10, y_offset_panel), current_font, font_scale, (255, 0, 0), thickness, lineType=cv2.LINE_AA) y_offset_panel += int(line_height * 1.5) if 'error' not in analysis_results: cv2.putText(panel, "Angles:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height for joint, angle in analysis_results.get('angles', {}).items(): text_to_display = f"{joint.capitalize()}: {angle:.1f} deg" cv2.putText(panel, text_to_display, (20, y_offset_panel), current_font, font_scale, (0, 255, 0), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height if analysis_results.get('corrections'): y_offset_panel += line_height cv2.putText(panel, "Corrections:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height for correction in analysis_results.get('corrections', []): cv2.putText(panel, f"• {correction}", (20, y_offset_panel), current_font, font_scale, (0, 0, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height if analysis_results.get('notes'): y_offset_panel += line_height cv2.putText(panel, "Notes:", (10, y_offset_panel), current_font, font_scale, (200, 200, 200), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height for note in analysis_results.get('notes', []): cv2.putText(panel, f"• {note}", (20, y_offset_panel), current_font, font_scale, (200, 200, 200), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height else: cv2.putText(panel, "Error:", (10, y_offset_panel), current_font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA) y_offset_panel += line_height cv2.putText(panel, analysis_results.get('error', 'Unknown error'), (20, y_offset_panel), current_font, font_scale, (0, 0, 255), thickness, lineType=cv2.LINE_AA) combined_frame = np.hstack((processed_frame, panel)) out.write(combined_frame) cap.release() out.release() cleanup_memory() # Clean up after processing if frame_count == 0: raise ValueError("No frames were processed from the video by MediaPipe") logger.info(f"MediaPipe video processing completed. Processed {frame_count} frames. Output: {output_path}") video_url = url_for('serve_video', filename=output_filename, _external=False) print(f"[DEBUG] Returning video URL: {video_url}") return video_url except Exception as e: logger.error(f'Error in process_video_mediapipe: {e}') traceback.print_exc() raise finally: cleanup_memory() # Clean up in case of error @app.route('/') def index(): return render_template('index.html') # Add error handling for video processing def safe_video_processing(video_path, model_choice): """Wrapper function to handle video processing with proper cleanup.""" try: if model_choice == 'movenet': return process_video_movenet(video_path) else: return process_video_mediapipe(video_path) except Exception as e: logger.error(f"Error in video processing: {e}") logger.error(traceback.format_exc()) raise finally: cleanup_memory() @app.route('/upload', methods=['POST']) def upload_file(): try: cleanup_memory() if 'video' not in request.files: logger.error("[UPLOAD] No video file in request") return jsonify({'error': 'No video file provided'}), 400 file = request.files['video'] if file.filename == '': logger.error("[UPLOAD] Empty filename") return jsonify({'error': 'No selected file'}), 400 if file: allowed_extensions = {'mp4', 'avi', 'mov', 'mkv'} if '.' not in file.filename or file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions: logger.error(f"[UPLOAD] Invalid file format: {file.filename}") return jsonify({'error': 'Invalid file format. Allowed formats: mp4, avi, mov, mkv'}), 400 # Ensure the filename is properly sanitized filename = secure_filename(file.filename) logger.info(f"[UPLOAD] Original filename: {file.filename}") logger.info(f"[UPLOAD] Sanitized filename: {filename}") # Create a unique filename to prevent conflicts base, ext = os.path.splitext(filename) unique_filename = f"{base}_{int(time.time())}{ext}" filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) # Ensure upload directory exists os.makedirs(os.path.dirname(filepath), exist_ok=True) logger.info(f"[UPLOAD] Saving file to: {filepath}") file.save(filepath) if not os.path.exists(filepath): logger.error(f"[UPLOAD] File not found after save: {filepath}") return jsonify({'error': 'Failed to save uploaded file'}), 500 logger.info(f"[UPLOAD] File saved successfully. Size: {os.path.getsize(filepath)} bytes") try: model_choice = request.form.get('model_choice', 'Gladiator SupaDot') logger.info(f"[UPLOAD] Processing with model: {model_choice}") output_path_url = safe_video_processing(filepath, model_choice) logger.info(f"[UPLOAD] Processing complete. Output URL: {output_path_url}") output_path = os.path.join(app.config['UPLOAD_FOLDER'], os.path.basename(output_path_url)) if not os.path.exists(output_path): logger.error(f"[UPLOAD] Output file not found: {output_path}") return jsonify({'error': 'Output video file not found'}), 500 return jsonify({ 'message': f'Video processed successfully with {model_choice}', 'output_path': output_path_url }) except Exception as e: logger.error(f"[UPLOAD] Error processing video: {str(e)}") logger.error(traceback.format_exc()) return jsonify({'error': f'Error processing video: {str(e)}'}), 500 finally: try: if os.path.exists(filepath): os.remove(filepath) logger.info(f"[UPLOAD] Cleaned up input file: {filepath}") except Exception as e: logger.error(f"[UPLOAD] Error cleaning up file: {str(e)}") except Exception as e: logger.error(f"[UPLOAD] Unexpected error: {str(e)}") logger.error(traceback.format_exc()) return jsonify({'error': 'Internal server error'}), 500 finally: cleanup_memory() # Add more specific error handlers @app.errorhandler(413) def request_entity_too_large(error): logger.error(f"File too large: {error}") return jsonify({'error': 'File too large. Maximum size is 100MB'}), 413 @app.errorhandler(500) def internal_server_error(error): logger.error(f"Internal server error: {error}") return jsonify({'error': 'Internal server error. Please try again later.'}), 500 @app.errorhandler(404) def not_found_error(error): logger.error(f"Resource not found: {error}") return jsonify({'error': 'Resource not found'}), 404 if __name__ == '__main__': # Ensure the port is 7860 and debug is False for HF Spaces deployment app.run(host='0.0.0.0', port=7860, debug=False)