Spaces:
Sleeping
Sleeping
import cv2 | |
import logging | |
from typing import Optional | |
import numpy as np | |
# Setup logging | |
logging.basicConfig( | |
filename="app.log", | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
# Global video capture object | |
cap: Optional[cv2.VideoCapture] = None | |
FRAME_SKIP: int = 5 | |
current_frame_idx: int = 0 | |
def preload_video(video_path: str) -> None: | |
""" | |
Load a video file for processing. | |
Args: | |
video_path: Path to the video file. | |
Raises: | |
RuntimeError: If the video cannot be loaded. | |
""" | |
global cap, current_frame_idx | |
release_video() | |
try: | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise RuntimeError(f"Failed to open video: {video_path}. Ensure the file exists and is a supported format (.mp4, .avi).") | |
# Validate video properties | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
if total_frames <= 0: | |
raise RuntimeError(f"Invalid video: {video_path}. No frames detected.") | |
if fps <= 0: | |
raise RuntimeError(f"Invalid video: {video_path}. Invalid FPS.") | |
# Check video codec | |
fourcc = int(cap.get(cv2.CAP_PROP_FOURCC)) | |
codec = "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) | |
log_message = (f"Loaded video: {video_path}, Total Frames: {total_frames}, " | |
f"FPS: {fps}, Codec: {codec}") | |
logging.info(log_message) | |
current_frame_idx = 0 | |
except Exception as e: | |
logging.error(f"Error loading video {video_path}: {str(e)}") | |
raise RuntimeError(f"Error loading video {video_path}: {str(e)}") | |
def release_video() -> None: | |
""" | |
Release the video capture object. | |
""" | |
global cap | |
if cap is not None: | |
cap.release() | |
cap = None | |
def reset_video_index() -> None: | |
""" | |
Reset the video frame index to the beginning. | |
""" | |
global current_frame_idx | |
if cap is not None and cap.isOpened(): | |
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
current_frame_idx = 0 | |
logging.info("Reset video to start.") | |
else: | |
logging.warning("Cannot reset video: Video capture not initialized.") | |
def get_next_video_frame() -> Optional[np.ndarray]: | |
""" | |
Retrieve the next frame from the video, skipping frames as specified. | |
Returns: | |
The next frame as a numpy array, or None if no frame is available. | |
""" | |
global cap, current_frame_idx | |
if cap is None or not cap.isOpened(): | |
logging.error("Video capture not initialized.") | |
return None | |
try: | |
# Skip frames to reduce processing load | |
previous_idx = current_frame_idx | |
target_frame = current_frame_idx + FRAME_SKIP | |
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) | |
success, frame = cap.read() | |
if not success or frame is None: | |
# Video ended, reset to start | |
logging.info("Reached end of video, resetting to start.") | |
reset_video_index() | |
success, frame = cap.read() | |
if not success or frame is None: | |
logging.warning("Failed to retrieve frame after reset.") | |
return None | |
current_frame_idx = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) | |
logging.info(f"Retrieved frame {current_frame_idx}, skipped {current_frame_idx - previous_idx} frames") | |
return frame | |
except Exception as e: | |
logging.error(f"Error retrieving frame: {str(e)}") | |
return None |