seawolf2357's picture
Upload folder using huggingface_hub
684943d verified
import logging
import os
import cv2
import ffmpeg
import numpy as np
import torch
class VideoPreprocessor:
def __init__(self, cfg):
self.cfg = cfg
def video_process(self):
'''
assuming the video is saved on somewhere. Need modified if video are given
as a obj directly.
'''
self.extract_frames(self.cfg.pipeline.rgb_video_path, "input")
self.extract_frames(self.cfg.pipeline.normal_video_path, "normal")
if self.cfg.feature_extractor.type == "open-seg":
self.extract_masks("lang_features_dim3")
elif self.cfg.feature_extractor.type == "lseg":
self.extract_masks("lang_features_dim4")
def extract_frames(self, video_path, file_name):
img_save_path = os.path.join(self.cfg.pipeline.data_path, file_name)
format = self.cfg.video_processor.img_format
logging.info(f"Extracting frames from {video_path}...")
os.makedirs(img_save_path, exist_ok=True)
ffmpeg.input(video_path).output(os.path.join(img_save_path, f"%04d.{format}")).run(quiet=True)
def extract_masks(self, save_dir_name):
colors = np.load(os.path.join(self.cfg.pipeline.data_path, "colors.npy"))
colors = torch.from_numpy(colors).to(dtype=torch.float32, device="cuda") # [n_masks, 3]
colors /= 255
seg_video_path = self.cfg.pipeline.seg_video_path
logging.info(f"Loading mask video from {seg_video_path}")
seg_video = torch.from_numpy(self.load_from_video(seg_video_path)).to(dtype=torch.float32, device="cuda")
seg_video /= 255
for idx, frame in enumerate(seg_video):
dist = ((frame.unsqueeze(-2) - colors[None, None, :, :]) ** 2).sum(dim=-1)
mask = torch.argmin(dist, dim=-1) - 1 # -1 : background
save_path = os.path.join(self.cfg.pipeline.data_path, save_dir_name)
os.makedirs(save_path, exist_ok=True)
np.save(os.path.join(save_path, str(idx+1).zfill(4) + "_s.npy"), mask.cpu().numpy())
def load_from_frames(self, frame_path):
frame_list = os.listdir(frame_path)
frame_list = sorted(filter(lambda x: x.split(".")[-1] in ["jpg", "png", "jpeg"], frame_list))
all_frames = []
for frame_name in frame_list:
frame = cv2.imread(os.path.join(frame_path, frame_name))
all_frames.append(frame)
all_frames = np.array(all_frames)
return all_frames
def load_from_video(self, video_path):
out, _ = (
ffmpeg.input(video_path)
.output("pipe:", format="rawvideo", pix_fmt="rgb24")
.run(capture_stdout=True)
)
probe = ffmpeg.probe(video_path)
video_info = next(stream for stream in probe["streams"] if stream["codec_type"] == "video")
width = int(video_info["width"])
height = int(video_info["height"])
video_array = np.frombuffer(out, dtype=np.uint8)
video_array = video_array.reshape((-1, height, width, 3))
return video_array