lbw_drs_app_new / drs_modules /video_processing.py
dschandra's picture
Upload 6 files
2db7738 verified
raw
history blame
3.91 kB
"""Video processing utilities for the DRS application.
This module provides helper functions to save uploaded videos to the
filesystem and to trim the last N seconds from a video. Using
OpenCV's ``VideoCapture`` and ``VideoWriter`` avoids external
dependencies like ffmpeg or moviepy, which may not be installed in
all execution environments.
"""
from __future__ import annotations
import os
import shutil
from pathlib import Path
from typing import Union
import cv2
def save_uploaded_video(name: str, file_obj: Union[bytes, str, Path]) -> str:
"""Persist an uploaded video to a predictable location on disk.
When a user records or uploads a video in the Gradio interface, it
arrives as a temporary file object. To analyse the video later we
copy it into the working directory using its original filename.
Parameters
----------
name: str
The original filename from the upload widget.
file_obj: Union[bytes, str, Path]
The file-like object representing the uploaded video. Gradio
passes the file as a ``gradio.Files`` object whose `.name`
property holds the temporary path. This function accepts
either the temporary path or an open file handle.
Returns
-------
str
The absolute path where the video has been saved.
"""
# Determine a safe output directory. Use the current working
# directory so that Gradio can later access the file by path.
output_dir = Path(os.getcwd()) / "user_videos"
output_dir.mkdir(exist_ok=True)
# Compose an output filename; avoid overwriting by prefixing with an
# incrementing integer if necessary.
base_name = Path(name).stem
ext = Path(name).suffix or ".mp4"
counter = 0
dest = output_dir / f"{base_name}{ext}"
while dest.exists():
counter += 1
dest = output_dir / f"{base_name}_{counter}{ext}"
# If file_obj is a path, simply copy it; otherwise, read and write
if isinstance(file_obj, (str, Path)):
shutil.copy(str(file_obj), dest)
else:
# Gradio passes a file-like object with a `.read()` method
with open(dest, "wb") as f_out:
f_out.write(file_obj.read())
return str(dest)
def trim_last_seconds(input_path: str, output_path: str, seconds: int) -> None:
"""Save the last ``seconds`` of a video to ``output_path``.
This function reads the entire video file, calculates the starting
frame corresponding to ``seconds`` before the end, and writes the
remaining frames to a new video using OpenCV. If the video is
shorter than the requested duration, the whole video is copied.
Parameters
----------
input_path: str
Path to the source video file.
output_path: str
Path where the trimmed video will be saved.
seconds: int
The duration from the end of the video to retain.
"""
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise RuntimeError(f"Unable to open video: {input_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps <= 0:
fps = 30.0 # default fallback
frames_to_keep = int(seconds * fps)
start_frame = max(total_frames - frames_to_keep, 0)
# Prepare writer with the same properties as the input
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# Skip frames until start_frame
current = 0
while current < start_frame:
ret, _ = cap.read()
if not ret:
break
current += 1
# Write remaining frames
while True:
ret, frame = cap.read()
if not ret:
break
out.write(frame)
cap.release()
out.release()