Spaces:
Sleeping
Sleeping
File size: 3,543 Bytes
ae09838 3dfdd5f 61604b2 da56cf2 2d452c2 3dfdd5f 0f1ca3c 61604b2 c374261 61604b2 2933e05 61604b2 2933e05 61604b2 0068c73 61604b2 0068c73 61604b2 2d452c2 61604b2 3a3c94c 61604b2 2933e05 61604b2 0068c73 61604b2 0068c73 61604b2 2933e05 61604b2 3dfdd5f 2d452c2 61604b2 026ced7 61604b2 0068c73 61604b2 0068c73 61604b2 026ced7 3dfdd5f 2d452c2 61604b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
import cv2
import logging
from typing import Optional
import numpy as np
# Setup logging
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# Global video capture object
cap: Optional[cv2.VideoCapture] = None
FRAME_SKIP: int = 5
current_frame_idx: int = 0
def preload_video(video_path: str) -> None:
"""
Load a video file for processing.
Args:
video_path: Path to the video file.
Raises:
RuntimeError: If the video cannot be loaded.
"""
global cap, current_frame_idx
release_video()
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Failed to open video: {video_path}. Ensure the file exists and is a supported format (.mp4, .avi).")
# Validate video properties
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
if total_frames <= 0:
raise RuntimeError(f"Invalid video: {video_path}. No frames detected.")
if fps <= 0:
raise RuntimeError(f"Invalid video: {video_path}. Invalid FPS.")
# Check video codec
fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
codec = "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)])
log_message = (f"Loaded video: {video_path}, Total Frames: {total_frames}, "
f"FPS: {fps}, Codec: {codec}")
logging.info(log_message)
current_frame_idx = 0
except Exception as e:
logging.error(f"Error loading video {video_path}: {str(e)}")
raise RuntimeError(f"Error loading video {video_path}: {str(e)}")
def release_video() -> None:
"""
Release the video capture object.
"""
global cap
if cap is not None:
cap.release()
cap = None
def reset_video_index() -> None:
"""
Reset the video frame index to the beginning.
"""
global current_frame_idx
if cap is not None and cap.isOpened():
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
current_frame_idx = 0
logging.info("Reset video to start.")
else:
logging.warning("Cannot reset video: Video capture not initialized.")
def get_next_video_frame() -> Optional[np.ndarray]:
"""
Retrieve the next frame from the video, skipping frames as specified.
Returns:
The next frame as a numpy array, or None if no frame is available.
"""
global cap, current_frame_idx
if cap is None or not cap.isOpened():
logging.error("Video capture not initialized.")
return None
try:
# Skip frames to reduce processing load
previous_idx = current_frame_idx
target_frame = current_frame_idx + FRAME_SKIP
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
success, frame = cap.read()
if not success or frame is None:
# Video ended, reset to start
logging.info("Reached end of video, resetting to start.")
reset_video_index()
success, frame = cap.read()
if not success or frame is None:
logging.warning("Failed to retrieve frame after reset.")
return None
current_frame_idx = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
logging.info(f"Retrieved frame {current_frame_idx}, skipped {current_frame_idx - previous_idx} frames")
return frame
except Exception as e:
logging.error(f"Error retrieving frame: {str(e)}")
return None |