File size: 3,543 Bytes
ae09838
3dfdd5f
61604b2
 
da56cf2
2d452c2
3dfdd5f
 
 
 
 
0f1ca3c
61604b2
 
 
 
c374261
61604b2
 
 
 
 
 
 
 
 
 
2933e05
61604b2
2933e05
61604b2
0068c73
 
61604b2
 
0068c73
 
 
 
 
 
 
 
 
 
 
61604b2
2d452c2
61604b2
 
3a3c94c
61604b2
 
 
 
 
 
 
 
2933e05
61604b2
 
 
 
 
0068c73
61604b2
 
 
0068c73
 
61604b2
 
 
 
 
 
 
 
2933e05
61604b2
 
3dfdd5f
2d452c2
61604b2
026ced7
61604b2
 
 
 
 
0068c73
61604b2
 
 
0068c73
61604b2
 
026ced7
3dfdd5f
 
2d452c2
61604b2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import cv2
import logging
from typing import Optional
import numpy as np

# Setup logging
logging.basicConfig(
    filename="app.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# Global video capture object
cap: Optional[cv2.VideoCapture] = None
FRAME_SKIP: int = 5
current_frame_idx: int = 0

def preload_video(video_path: str) -> None:
    """
    Load a video file for processing.
    Args:
        video_path: Path to the video file.
    Raises:
        RuntimeError: If the video cannot be loaded.
    """
    global cap, current_frame_idx
    release_video()
    try:
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise RuntimeError(f"Failed to open video: {video_path}. Ensure the file exists and is a supported format (.mp4, .avi).")

        # Validate video properties
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        if total_frames <= 0:
            raise RuntimeError(f"Invalid video: {video_path}. No frames detected.")
        if fps <= 0:
            raise RuntimeError(f"Invalid video: {video_path}. Invalid FPS.")

        # Check video codec
        fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
        codec = "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)])
        log_message = (f"Loaded video: {video_path}, Total Frames: {total_frames}, "
                       f"FPS: {fps}, Codec: {codec}")
        logging.info(log_message)
        current_frame_idx = 0
    except Exception as e:
        logging.error(f"Error loading video {video_path}: {str(e)}")
        raise RuntimeError(f"Error loading video {video_path}: {str(e)}")

def release_video() -> None:
    """
    Release the video capture object.
    """
    global cap
    if cap is not None:
        cap.release()
    cap = None

def reset_video_index() -> None:
    """
    Reset the video frame index to the beginning.
    """
    global current_frame_idx
    if cap is not None and cap.isOpened():
        cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
        current_frame_idx = 0
        logging.info("Reset video to start.")
    else:
        logging.warning("Cannot reset video: Video capture not initialized.")

def get_next_video_frame() -> Optional[np.ndarray]:
    """
    Retrieve the next frame from the video, skipping frames as specified.
    Returns:
        The next frame as a numpy array, or None if no frame is available.
    """
    global cap, current_frame_idx
    if cap is None or not cap.isOpened():
        logging.error("Video capture not initialized.")
        return None

    try:
        # Skip frames to reduce processing load
        previous_idx = current_frame_idx
        target_frame = current_frame_idx + FRAME_SKIP
        cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
        success, frame = cap.read()
        if not success or frame is None:
            # Video ended, reset to start
            logging.info("Reached end of video, resetting to start.")
            reset_video_index()
            success, frame = cap.read()
            if not success or frame is None:
                logging.warning("Failed to retrieve frame after reset.")
                return None
        current_frame_idx = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
        logging.info(f"Retrieved frame {current_frame_idx}, skipped {current_frame_idx - previous_idx} frames")
        return frame
    except Exception as e:
        logging.error(f"Error retrieving frame: {str(e)}")
        return None