Spaces:
Runtime error
Runtime error
import cv2 | |
import os | |
import logging | |
# Setup logging | |
logging.basicConfig( | |
filename="app.log", | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
# Global state | |
VIDEO_DIR = "data" | |
video_files = [] | |
video_index = 0 | |
cap = None | |
FRAME_SKIP = 3 | |
PRELOADED_FRAMES = [] | |
def update_video_files(video_path: str = None): | |
global video_files | |
if video_path and os.path.exists(video_path) and video_path.endswith((".mp4", ".avi")): | |
video_files = [video_path] | |
else: | |
if os.path.exists(VIDEO_DIR): | |
video_files = [ | |
os.path.join(VIDEO_DIR, file) | |
for file in sorted(os.listdir(VIDEO_DIR)) | |
if file.endswith((".mp4", ".avi")) | |
] | |
else: | |
video_files = [] | |
if not video_files: | |
logging.warning("No video files found.") | |
else: | |
logging.info(f"Found video files: {video_files}") | |
def preload_video(video_path: str = None): | |
global cap, video_index, PRELOADED_FRAMES, video_files | |
update_video_files(video_path) | |
if not video_files: | |
logging.error("No video files available to preload.") | |
raise RuntimeError("No video files found in the 'data' directory or provided path.") | |
try: | |
video_index = 0 | |
cap = cv2.VideoCapture(video_files[video_index]) | |
if not cap.isOpened(): | |
raise RuntimeError(f"Failed to open video {video_files[video_index]}") | |
PRELOADED_FRAMES = [] | |
for _ in range(5): | |
ret, frame = cap.read() | |
if ret: | |
frame = cv2.resize(frame, (320, 240)) | |
PRELOADED_FRAMES.append(frame) | |
else: | |
break | |
cap.release() | |
cap = None | |
logging.info(f"Preloaded {len(PRELOADED_FRAMES)} frames from {video_files[video_index]}") | |
except Exception as e: | |
logging.error(f"Error preloading video: {str(e)}") | |
raise RuntimeError(f"Failed to preload video: {str(e)}") | |
def get_next_video_frame(): | |
global cap, video_index, PRELOADED_FRAMES | |
if not video_files: | |
raise RuntimeError("No video files found.") | |
if PRELOADED_FRAMES: | |
frame = PRELOADED_FRAMES.pop(0) | |
logging.debug(f"Retrieved preloaded frame {len(PRELOADED_FRAMES) + 1}") | |
return frame | |
if cap is None or not cap.isOpened(): | |
cap = cv2.VideoCapture(video_files[video_index]) | |
try: | |
for _ in range(FRAME_SKIP): | |
ret, frame = cap.read() | |
if not ret: | |
cap.release() | |
video_index = (video_index + 1) % len(video_files) | |
cap = cv2.VideoCapture(video_files[video_index]) | |
ret, frame = cap.read() | |
if not ret: | |
raise RuntimeError(f"Cannot read video {video_files[video_index]}") | |
break | |
frame = cv2.resize(frame, (320, 240)) | |
logging.debug(f"Retrieved frame from {video_files[video_index]}") | |
return frame | |
except Exception as e: | |
logging.error(f"Error retrieving frame: {str(e)}") | |
raise RuntimeError(f"Failed to retrieve frame: {str(e)}") | |
def reset_video_index(): | |
global video_index | |
video_index = 0 | |
logging.info("Reset video index to 0.") | |
def release_video(): | |
global cap | |
if cap is not None: | |
cap.release() | |
cap = None | |
logging.info("Released video capture object.") |