# ------------------------------------------------------------------------ # Modified from Grounded-SAM (https://github.com/IDEA-Research/Grounded-Segment-Anything) # ------------------------------------------------------------------------ import os import sys import random import warnings os.system("export BUILD_WITH_CUDA=True") os.system("python -m pip install -e segment-anything") os.system("python -m pip install -e GroundingDINO") os.system("pip install --upgrade diffusers[torch]") #os.system("pip install opencv-python pycocotools matplotlib") sys.path.insert(0, './GroundingDINO') sys.path.insert(0, './segment-anything') warnings.filterwarnings("ignore") import cv2 from scipy import ndimage import gradio as gr import argparse import numpy as np from PIL import Image from moviepy.editor import * import torch from torch.nn import functional as F import torchvision import networks import utils # Grounding DINO from groundingdino.util.inference import Model # SAM from segment_anything.utils.transforms import ResizeLongestSide # SD from diffusers import StableDiffusionPipeline transform = ResizeLongestSide(1024) # Green Screen PALETTE_back = (51, 255, 146) GROUNDING_DINO_CONFIG_PATH = "GroundingDINO/groundingdino/config/GroundingDINO_SwinT_OGC.py" GROUNDING_DINO_CHECKPOINT_PATH = "checkpoints/groundingdino_swint_ogc.pth" mam_checkpoint="checkpoints/mam_sam_vitb.pth" output_dir="outputs" device = 'cuda' background_list = os.listdir('assets/backgrounds') #groundingdino_model = None #mam_predictor = None #generator = None # initialize MAM mam_model = networks.get_generator_m2m(seg='sam', m2m='sam_decoder_deep') mam_model.to(device) checkpoint = torch.load(mam_checkpoint, map_location=device) mam_model.load_state_dict(utils.remove_prefix_state_dict(checkpoint['state_dict']), strict=True) mam_model = mam_model.eval() # initialize GroundingDINO grounding_dino_model = Model(model_config_path=GROUNDING_DINO_CONFIG_PATH, model_checkpoint_path=GROUNDING_DINO_CHECKPOINT_PATH, device=device) # initialize StableDiffusionPipeline generator = StableDiffusionPipeline.from_pretrained("checkpoints/stable-diffusion-v1-5", torch_dtype=torch.float16) generator.to(device) def get_frames(video_in): frames = [] #resize the video clip = VideoFileClip(video_in) #check fps if clip.fps > 30: print("vide rate is over 30, resetting to 30") clip_resized = clip.resize(height=512) clip_resized.write_videofile("video_resized.mp4", fps=30) else: print("video rate is OK") clip_resized = clip.resize(height=512) clip_resized.write_videofile("video_resized.mp4", fps=clip.fps) print("video resized to 512 height") # Opens the Video file with CV2 cap= cv2.VideoCapture("video_resized.mp4") fps = cap.get(cv2.CAP_PROP_FPS) print("video fps: " + str(fps)) i=0 while(cap.isOpened()): ret, frame = cap.read() if ret == False: break cv2.imwrite('kang'+str(i)+'.jpg',frame) frames.append('kang'+str(i)+'.jpg') i+=1 cap.release() cv2.destroyAllWindows() print("broke the video into frames") return frames, fps def create_video(frames, fps): print("building video result") clip = ImageSequenceClip(frames, fps=fps) clip.write_videofile("movie.mp4", fps=fps) return 'movie.mp4' def run_grounded_sam(input_image, text_prompt, task_type, background_prompt): background_type = "generated_by_text" box_threshold = 0.25 text_threshold = 0.25 iou_threshold = 0.5 scribble_mode = "split" guidance_mode = "alpha" #global groundingdino_model, sam_predictor, generator # make dir os.makedirs(output_dir, exist_ok=True) #if mam_predictor is None: # initialize MAM # build model # mam_model = networks.get_generator_m2m(seg='sam', m2m='sam_decoder_deep') # mam_model.to(device) # load checkpoint # checkpoint = torch.load(mam_checkpoint, map_location=device) # mam_model.load_state_dict(utils.remove_prefix_state_dict(checkpoint['state_dict']), strict=True) # inference # mam_model = mam_model.eval() #if groundingdino_model is None: # grounding_dino_model = Model(model_config_path=GROUNDING_DINO_CONFIG_PATH, model_checkpoint_path=GROUNDING_DINO_CHECKPOINT_PATH, device=device) #if generator is None: # generator = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16) # generator.to(device) # load image #image_ori = input_image["image"] image_ori = input_image #scribble = input_image["mask"] original_size = image_ori.shape[:2] if task_type == 'text': if text_prompt is None: print('Please input non-empty text prompt') with torch.no_grad(): detections, phrases = grounding_dino_model.predict_with_caption( image=cv2.cvtColor(image_ori, cv2.COLOR_RGB2BGR), caption=text_prompt, box_threshold=box_threshold, text_threshold=text_threshold ) if len(detections.xyxy) > 1: nms_idx = torchvision.ops.nms( torch.from_numpy(detections.xyxy), torch.from_numpy(detections.confidence), iou_threshold, ).numpy().tolist() detections.xyxy = detections.xyxy[nms_idx] detections.confidence = detections.confidence[nms_idx] bbox = detections.xyxy[np.argmax(detections.confidence)] bbox = transform.apply_boxes(bbox, original_size) bbox = torch.as_tensor(bbox, dtype=torch.float).to(device) image = transform.apply_image(image_ori) image = torch.as_tensor(image).to(device) image = image.permute(2, 0, 1).contiguous() pixel_mean = torch.tensor([123.675, 116.28, 103.53]).view(3,1,1).to(device) pixel_std = torch.tensor([58.395, 57.12, 57.375]).view(3,1,1).to(device) image = (image - pixel_mean) / pixel_std h, w = image.shape[-2:] pad_size = image.shape[-2:] padh = 1024 - h padw = 1024 - w image = F.pad(image, (0, padw, 0, padh)) if task_type == 'scribble_point': scribble = scribble.transpose(2, 1, 0)[0] labeled_array, num_features = ndimage.label(scribble >= 255) centers = ndimage.center_of_mass(scribble, labeled_array, range(1, num_features+1)) centers = np.array(centers) ### (x,y) centers = transform.apply_coords(centers, original_size) point_coords = torch.from_numpy(centers).to(device) point_coords = point_coords.unsqueeze(0).to(device) point_labels = torch.from_numpy(np.array([1] * len(centers))).unsqueeze(0).to(device) if scribble_mode == 'split': point_coords = point_coords.permute(1, 0, 2) point_labels = point_labels.permute(1, 0) sample = {'image': image.unsqueeze(0), 'point': point_coords, 'label': point_labels, 'ori_shape': original_size, 'pad_shape': pad_size} elif task_type == 'scribble_box': scribble = scribble.transpose(2, 1, 0)[0] labeled_array, num_features = ndimage.label(scribble >= 255) centers = ndimage.center_of_mass(scribble, labeled_array, range(1, num_features+1)) centers = np.array(centers) ### (x1, y1, x2, y2) x_min = centers[:, 0].min() x_max = centers[:, 0].max() y_min = centers[:, 1].min() y_max = centers[:, 1].max() bbox = np.array([x_min, y_min, x_max, y_max]) bbox = transform.apply_boxes(bbox, original_size) bbox = torch.as_tensor(bbox, dtype=torch.float).to(device) sample = {'image': image.unsqueeze(0), 'bbox': bbox.unsqueeze(0), 'ori_shape': original_size, 'pad_shape': pad_size} elif task_type == 'text': sample = {'image': image.unsqueeze(0), 'bbox': bbox.unsqueeze(0), 'ori_shape': original_size, 'pad_shape': pad_size} else: print("task_type:{} error!".format(task_type)) with torch.no_grad(): feas, pred, post_mask = mam_model.forward_inference(sample) alpha_pred_os1, alpha_pred_os4, alpha_pred_os8 = pred['alpha_os1'], pred['alpha_os4'], pred['alpha_os8'] alpha_pred_os8 = alpha_pred_os8[..., : sample['pad_shape'][0], : sample['pad_shape'][1]] alpha_pred_os4 = alpha_pred_os4[..., : sample['pad_shape'][0], : sample['pad_shape'][1]] alpha_pred_os1 = alpha_pred_os1[..., : sample['pad_shape'][0], : sample['pad_shape'][1]] alpha_pred_os8 = F.interpolate(alpha_pred_os8, sample['ori_shape'], mode="bilinear", align_corners=False) alpha_pred_os4 = F.interpolate(alpha_pred_os4, sample['ori_shape'], mode="bilinear", align_corners=False) alpha_pred_os1 = F.interpolate(alpha_pred_os1, sample['ori_shape'], mode="bilinear", align_corners=False) if guidance_mode == 'mask': weight_os8 = utils.get_unknown_tensor_from_mask_oneside(post_mask, rand_width=10, train_mode=False) post_mask[weight_os8>0] = alpha_pred_os8[weight_os8>0] alpha_pred = post_mask.clone().detach() else: weight_os8 = utils.get_unknown_box_from_mask(post_mask) alpha_pred_os8[weight_os8>0] = post_mask[weight_os8>0] alpha_pred = alpha_pred_os8.clone().detach() weight_os4 = utils.get_unknown_tensor_from_pred_oneside(alpha_pred, rand_width=20, train_mode=False) alpha_pred[weight_os4>0] = alpha_pred_os4[weight_os4>0] weight_os1 = utils.get_unknown_tensor_from_pred_oneside(alpha_pred, rand_width=10, train_mode=False) alpha_pred[weight_os1>0] = alpha_pred_os1[weight_os1>0] alpha_pred = alpha_pred[0][0].cpu().numpy() #### draw ### alpha matte alpha_rgb = cv2.cvtColor(np.uint8(alpha_pred*255), cv2.COLOR_GRAY2RGB) ### com img with background if background_type == 'real_world_sample': background_img_file = os.path.join('assets/backgrounds', random.choice(background_list)) background_img = cv2.imread(background_img_file) background_img = cv2.cvtColor(background_img, cv2.COLOR_BGR2RGB) background_img = cv2.resize(background_img, (image_ori.shape[1], image_ori.shape[0])) com_img = alpha_pred[..., None] * image_ori + (1 - alpha_pred[..., None]) * np.uint8(background_img) com_img = np.uint8(com_img) else: if background_prompt is None: print('Please input non-empty background prompt') else: background_img = generator(background_prompt).images[0] background_img = np.array(background_img) background_img = cv2.resize(background_img, (image_ori.shape[1], image_ori.shape[0])) com_img = alpha_pred[..., None] * image_ori + (1 - alpha_pred[..., None]) * np.uint8(background_img) com_img = np.uint8(com_img) ### com img with green screen green_img = alpha_pred[..., None] * image_ori + (1 - alpha_pred[..., None]) * np.array([PALETTE_back], dtype='uint8') green_img = np.uint8(green_img) #return [(com_img, 'composite with background'), (green_img, 'green screen'), (alpha_rgb, 'alpha matte')] return com_img, alpha_rgb def infer(video_in, trim_value, prompt, background_prompt): print(prompt) break_vid = get_frames(video_in) frames_list= break_vid[0] fps = break_vid[1] n_frame = int(trim_value*fps) if n_frame >= len(frames_list): print("video is shorter than the cut value") n_frame = len(frames_list) with_bg_result_frames = [] with_green_result_frames = [] with_matte_result_frames = [] print("set stop frames to: " + str(n_frame)) for i in frames_list[0:int(n_frame)]: to_numpy_i = Image.open(i).convert("RGB") #need to convert to numpy # Convert the image to a NumPy array image_array = np.array(to_numpy_i) results = run_grounded_sam(image_array, prompt, "text", background_prompt) bg_img = Image.fromarray(results[0]) greem_img = Image.fromarray(results[1]) matte_img = Image.fromarray(results[2]) # exporting the images bg_img.save(f"bg_result_img-{i}.jpg") with_bg_result_frames.append(f"bg_result_img-{i}.jpg") green_img.save(f"green_result_img-{i}.jpg") with_green_result_frames.append(f"green_result_img-{i}.jpg") matte_img.save(f"matte_result_img-{i}.jpg") with_matte_result_frames.append(f"matte_result_img-{i}.jpg") print("frame " + i + "/" + str(n_frame) + ": done;") vid_bg = create_video(with_bg_result_frames, fps) vid_green = create_video(with_green_result_frames, fps) vid_matte = create_video(with_matte_result_frames, fps) print("finished !") return vid_bg, vid_green, vid_matte if __name__ == "__main__": parser = argparse.ArgumentParser("MAM demo", add_help=True) parser.add_argument("--debug", action="store_true", help="using debug mode") parser.add_argument("--share", action="store_true", help="share the app") parser.add_argument('--port', type=int, default=7589, help='port to run the server') parser.add_argument('--no-gradio-queue', action="store_true", help='path to the SAM checkpoint') args = parser.parse_args() print(args) block = gr.Blocks() if not args.no_gradio_queue: block = block.queue() with block: gr.Markdown( """ # Matting Anything in Video Demo Welcome to the Matting Anything in Video demo and upload your video to get started
You may open usage details below to understand how to use this demo. ## Usage
You may upload a video to start, for the moment we only support 1 prompt type to get the alpha matte of the target: **text**: Send text prompt to identify the target instance in the `Text prompt` box. We also only support 1 background type to support image composition with the alpha matte output: **generated_by_text**: Send background text prompt to create a background image with stable diffusion model in the `Background prompt` box.
""") with gr.Row(): with gr.Column(): video_in = gr.Video(source='upload', type="filepath") trim_in = gr.Slider(label="Cut video at (s)", minimun=1, maximum=5, step=1, value=1) #task_type = gr.Dropdown(["scribble_point", "scribble_box", "text"], value="text", label="Prompt type") #task_type = "text" text_prompt = gr.Textbox(label="Text prompt", placeholder="the girl in the middle", info="Describe the subject visible in your video that you want to matte") #background_type = gr.Dropdown(["generated_by_text", "real_world_sample"], value="generated_by_text", label="Background type") background_prompt = gr.Textbox(label="Background prompt", placeholder="downtown area in New York") run_button = gr.Button(label="Run") #with gr.Accordion("Advanced options", open=False): # box_threshold = gr.Slider( # label="Box Threshold", minimum=0.0, maximum=1.0, value=0.25, step=0.05 # ) # text_threshold = gr.Slider( # label="Text Threshold", minimum=0.0, maximum=1.0, value=0.25, step=0.05 # ) # iou_threshold = gr.Slider( # label="IOU Threshold", minimum=0.0, maximum=1.0, value=0.5, step=0.05 # ) # scribble_mode = gr.Dropdown( # ["merge", "split"], value="split", label="scribble_mode" # ) # guidance_mode = gr.Dropdown( # ["mask", "alpha"], value="alpha", label="guidance_mode", info="mask guidance is for complex scenes with multiple instances, alpha guidance is for simple scene with single instance" # ) with gr.Column(): #gallery = gr.Gallery( # label="Generated images", show_label=True, elem_id="gallery" #).style(preview=True, grid=3, object_fit="scale-down") vid_bg_out = gr.Video(label="Video with background") vid_green_out = gr.Video(label="Video green screen") vid_matte_out = gr.Video(label="Video matte") run_button.click(fn=infer, inputs=[ video_in, trim_in, text_prompt, background_prompt], outputs=[vid_bg_out, vid_green_out, vid_matte_out]) block.queue(max_size=12).launch(debug=args.debug, share=args.share, show_error=True) #block.queue(concurrency_count=100) #block.launch(server_name='0.0.0.0', server_port=args.port, debug=args.debug, share=args.share)