Spaces:
Runtime error
Runtime error
File size: 3,419 Bytes
ae09838 2d452c2 3dfdd5f da56cf2 2d452c2 3dfdd5f 0f1ca3c 2933e05 b379351 2933e05 2d452c2 2933e05 c374261 2933e05 2d452c2 2933e05 3a3c94c 2d452c2 2933e05 3dfdd5f 2d452c2 2933e05 2d452c2 2933e05 3dfdd5f 2d452c2 2933e05 2d452c2 2933e05 2d452c2 2933e05 2d452c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
import cv2
import os
import logging
# Setup logging
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# Global state
VIDEO_DIR = "data"
video_files = []
video_index = 0
cap = None
FRAME_SKIP = 3
PRELOADED_FRAMES = []
def update_video_files(video_path: str = None):
global video_files
if video_path and os.path.exists(video_path) and video_path.endswith((".mp4", ".avi")):
video_files = [video_path]
else:
if os.path.exists(VIDEO_DIR):
video_files = [
os.path.join(VIDEO_DIR, file)
for file in sorted(os.listdir(VIDEO_DIR))
if file.endswith((".mp4", ".avi"))
]
else:
video_files = []
if not video_files:
logging.warning("No video files found.")
else:
logging.info(f"Found video files: {video_files}")
def preload_video(video_path: str = None):
global cap, video_index, PRELOADED_FRAMES, video_files
update_video_files(video_path)
if not video_files:
logging.error("No video files available to preload.")
raise RuntimeError("No video files found in the 'data' directory or provided path.")
try:
video_index = 0
cap = cv2.VideoCapture(video_files[video_index])
if not cap.isOpened():
raise RuntimeError(f"Failed to open video {video_files[video_index]}")
PRELOADED_FRAMES = []
for _ in range(5):
ret, frame = cap.read()
if ret:
frame = cv2.resize(frame, (320, 240))
PRELOADED_FRAMES.append(frame)
else:
break
cap.release()
cap = None
logging.info(f"Preloaded {len(PRELOADED_FRAMES)} frames from {video_files[video_index]}")
except Exception as e:
logging.error(f"Error preloading video: {str(e)}")
raise RuntimeError(f"Failed to preload video: {str(e)}")
def get_next_video_frame():
global cap, video_index, PRELOADED_FRAMES
if not video_files:
raise RuntimeError("No video files found.")
if PRELOADED_FRAMES:
frame = PRELOADED_FRAMES.pop(0)
logging.debug(f"Retrieved preloaded frame {len(PRELOADED_FRAMES) + 1}")
return frame
if cap is None or not cap.isOpened():
cap = cv2.VideoCapture(video_files[video_index])
try:
for _ in range(FRAME_SKIP):
ret, frame = cap.read()
if not ret:
cap.release()
video_index = (video_index + 1) % len(video_files)
cap = cv2.VideoCapture(video_files[video_index])
ret, frame = cap.read()
if not ret:
raise RuntimeError(f"Cannot read video {video_files[video_index]}")
break
frame = cv2.resize(frame, (320, 240))
logging.debug(f"Retrieved frame from {video_files[video_index]}")
return frame
except Exception as e:
logging.error(f"Error retrieving frame: {str(e)}")
raise RuntimeError(f"Failed to retrieve frame: {str(e)}")
def reset_video_index():
global video_index
video_index = 0
logging.info("Reset video index to 0.")
def release_video():
global cap
if cap is not None:
cap.release()
cap = None
logging.info("Released video capture object.") |