Spaces:
Sleeping
Sleeping
import cv2 | |
import os | |
import uuid | |
import numpy as np | |
from utils import extract_frames, save_video | |
TEMP_DIR = "temp_videos" | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
def process_live_video(): | |
# Simulated live stream (replace with real buffer system if needed) | |
live_source = "sample_videos/lbw_sample.mp4" | |
cap = cv2.VideoCapture(live_source) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
duration = total_frames / fps | |
# Trim last 10 seconds | |
trim_start = max(0, duration - 10) | |
start_frame = int(trim_start * fps) | |
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) | |
frames = [] | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frames.append(frame) | |
cap.release() | |
video_id = str(uuid.uuid4()) | |
trimmed_path = os.path.join(TEMP_DIR, f"{video_id}_trimmed.mp4") | |
save_video(frames, trimmed_path, fps) | |
# Extract & Analyze | |
decision = "Analyzing..." # Placeholder, will use lbw_detector later | |
return trimmed_path, decision | |
def process_uploaded_video(video_file): | |
cap = cv2.VideoCapture(video_file) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
frames = [] | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frames.append(frame) | |
cap.release() | |
video_id = str(uuid.uuid4()) | |
saved_path = os.path.join(TEMP_DIR, f"{video_id}_upload.mp4") | |
save_video(frames, saved_path, fps) | |
# Extract & Analyze | |
decision = "Analyzing..." # Placeholder for lbw_detector output | |
return saved_path, decision | |