Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| os.system('git clone https://github.com/facebookresearch/av_hubert.git') | |
| os.chdir('/home/user/app/av_hubert') | |
| os.system('git submodule init') | |
| os.system('git submodule update') | |
| os.chdir('/home/user/app/av_hubert/fairseq') | |
| os.system('pip install ./') | |
| os.system('pip install scipy') | |
| os.system('pip install sentencepiece') | |
| os.system('pip install python_speech_features') | |
| os.system('pip install scikit-video') | |
| os.system('pip install transformers') | |
| os.system('pip install gradio==3.12') | |
| os.system('pip install numpy==1.23.3') | |
| os.chdir('/home/user/app') | |
| os.makedirs("./result", exist_ok = True) | |
| os.makedirs("./video/và/test", exist_ok = True) | |
| # sys.path.append('/home/user/app/av_hubert') | |
| sys.path.append('/home/user/app/av_hubert/avhubert') | |
| print(sys.path) | |
| print(os.listdir()) | |
| print(sys.argv, type(sys.argv)) | |
| sys.argv.append('dummy') | |
| import dlib, cv2, os | |
| import numpy as np | |
| import skvideo | |
| import skvideo.io | |
| from tqdm import tqdm | |
| from preparation.align_mouth import landmarks_interpolate, crop_patch, write_video_ffmpeg | |
| from base64 import b64encode | |
| import torch | |
| import cv2 | |
| import tempfile | |
| from argparse import Namespace | |
| import fairseq | |
| from fairseq import checkpoint_utils, options, tasks, utils | |
| from fairseq.dataclass.configs import GenerationConfig | |
| from huggingface_hub import hf_hub_download | |
| import gradio as gr | |
| from pytube import YouTube | |
| # os.chdir('/home/user/app/av_hubert/avhubert') | |
| user_dir = "/home/user/app/av_hubert/avhubert" | |
| utils.import_user_module(Namespace(user_dir=user_dir)) | |
| data_dir = "/home/user/app/video" | |
| # ckpt_path = hf_hub_download('vumichien/AV-HuBERT', 'model.pt') | |
| face_detector_path = "/home/user/app/mmod_human_face_detector.dat" | |
| face_predictor_path = "/home/user/app/shape_predictor_68_face_landmarks.dat" | |
| mean_face_path = "/home/user/app/20words_mean_face.npy" | |
| mouth_roi_path = "/home/user/app/roi.mp4" | |
| output_video_path = "/home/user/app/video/và/test" | |
| modalities = ["video"] | |
| gen_subset = "test" | |
| gen_cfg = GenerationConfig(beam=20) | |
| # models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task([ckpt_path]) | |
| # models = [model.eval().cuda() if torch.cuda.is_available() else model.eval() for model in models] | |
| # saved_cfg.task.modalities = modalities | |
| # saved_cfg.task.data = data_dir | |
| # saved_cfg.task.label_dir = data_dir | |
| # task = tasks.setup_task(saved_cfg.task) | |
| # generator = task.build_generator(models, gen_cfg) | |
| def get_youtube(video_url): | |
| yt = YouTube(video_url) | |
| abs_video_path = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download() | |
| print("Success download video") | |
| print(abs_video_path) | |
| return abs_video_path | |
| import dlib, cv2, os | |
| import numpy as np | |
| import skvideo | |
| import skvideo.io | |
| from tqdm import tqdm | |
| from preparation.align_mouth import landmarks_interpolate, crop_patch, write_video_ffmpeg | |
| from IPython.display import HTML | |
| from base64 import b64encode | |
| import numpy as np | |
| def convert_bgr2gray(data): | |
| # np.stack(배열_1, 배열_2, axis=0): 지정한 axis를 완전히 새로운 axis로 생각 | |
| return np.stack([cv2.cvtColor(_, cv2.COLOR_BGR2GRAY) for _ in data], axis=0) | |
| def save2npz(filename, data=None): | |
| """save2npz. | |
| :param filename: str, the fileanme where the data will be saved. | |
| :param data: ndarray, arrays to save to the file. | |
| """ | |
| assert data is not None, "data is {}".format(data) | |
| if not os.path.exists(os.path.dirname(filename)): | |
| os.makedirs(os.path.dirname(filename)) | |
| np.savez_compressed(filename, data=data) | |
| def detect_landmark(image, detector, predictor): | |
| gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) | |
| face_locations = detector(gray, 1) | |
| coords = None | |
| for (_, face_location) in enumerate(face_locations): | |
| if torch.cuda.is_available(): | |
| rect = face_location.rect | |
| else: | |
| rect = face_location | |
| shape = predictor(gray, rect) | |
| coords = np.zeros((68, 2), dtype=np.int32) | |
| for i in range(0, 68): | |
| coords[i] = (shape.part(i).x, shape.part(i).y) | |
| return coords | |
| def preprocess_video(input_video_path): | |
| if torch.cuda.is_available(): | |
| detector = dlib.cnn_face_detection_model_v1(face_detector_path) | |
| else: | |
| detector = dlib.get_frontal_face_detector() | |
| predictor = dlib.shape_predictor(face_predictor_path) | |
| STD_SIZE = (256, 256) | |
| mean_face_landmarks = np.load(mean_face_path) | |
| stablePntsIDs = [33, 36, 39, 42, 45] | |
| videogen = skvideo.io.vread(input_video_path) | |
| frames = np.array([frame for frame in videogen]) | |
| landmarks = [] | |
| for frame in tqdm(frames): | |
| landmark = detect_landmark(frame, detector, predictor) | |
| landmarks.append(landmark) | |
| preprocessed_landmarks = landmarks_interpolate(landmarks) | |
| rois = crop_patch(input_video_path, preprocessed_landmarks, mean_face_landmarks, stablePntsIDs, STD_SIZE, | |
| window_margin=12, start_idx=48, stop_idx=68, crop_height=96, crop_width=96) | |
| rois_gray=convert_bgr2gray(rois) | |
| save2npz(output_video_path, data=rois_gray) | |
| write_video_ffmpeg(rois, mouth_roi_path, "/usr/bin/ffmpeg") | |
| return mouth_roi_path | |
| def predict(process_video): | |
| os.chdir('/home/user/app') | |
| return os.system('bash TestVisual.sh') | |
| # ---- Gradio Layout ----- | |
| youtube_url_in = gr.Textbox(label="Youtube url", lines=1, interactive=True) | |
| video_in = gr.Video(label="Input Video", mirror_webcam=False, interactive=True) | |
| video_out = gr.Video(label="Audio Visual Video", mirror_webcam=False, interactive=True) | |
| demo = gr.Blocks() | |
| demo.encrypt = False | |
| text_output = gr.Textbox() | |
| with demo: | |
| # gr.Markdown(''' | |
| # <div> | |
| # <h1 style='text-align: center'>Speech Recognition from Visual Lip Movement by Audio-Visual Hidden Unit BERT Model (AV-HuBERT)</h1> | |
| # This space uses AV-HuBERT models from <a href='https://github.com/facebookresearch' target='_blank'><b>Meta Research</b></a> to recoginze the speech from Lip Movement 🤗 | |
| # <figure> | |
| # <img src="https://huggingface.co/vumichien/AV-HuBERT/resolve/main/lipreading.gif" alt="Audio-Visual Speech Recognition"> | |
| # <figcaption> Speech Recognition from visual lip movement | |
| # </figcaption> | |
| # </figure> | |
| # </div> | |
| # ''') | |
| # with gr.Row(): | |
| # gr.Markdown(''' | |
| # ### Reading Lip movement with youtube link using Avhubert | |
| # ##### Step 1a. Download video from youtube (Note: the length of video should be less than 10 seconds if not it will be cut and the face should be stable for better result) | |
| # ##### Step 1b. You also can upload video directly | |
| # ##### Step 2. Generating landmarks surrounding mouth area | |
| # ##### Step 3. Reading lip movement. | |
| # ''') | |
| with gr.Row(): | |
| gr.Markdown(''' | |
| ### You can test by following examples: | |
| ''') | |
| examples = gr.Examples(examples= | |
| [ "https://www.youtube.com/watch?v=ZXVDnuepW2s", | |
| "https://www.youtube.com/watch?v=X8_glJn1B8o", | |
| "https://www.youtube.com/watch?v=80yqL2KzBVw"], | |
| label="Examples", inputs=[youtube_url_in]) | |
| with gr.Column(): | |
| youtube_url_in.render() | |
| download_youtube_btn = gr.Button("Download Youtube video") | |
| download_youtube_btn.click(get_youtube, [youtube_url_in], [ | |
| video_in]) | |
| print(video_in) | |
| with gr.Row(): | |
| video_in.render() | |
| video_out.render() | |
| with gr.Row(): | |
| detect_landmark_btn = gr.Button("Phát hiện mốc/cắt môi") | |
| detect_landmark_btn.click(preprocess_video, [video_in], [ | |
| video_out]) | |
| predict_btn = gr.Button("Dự đoán") | |
| predict_btn.click(predict, [video_out], [ | |
| text_output]) | |
| with gr.Row(): | |
| # video_lip = gr.Video(label="Audio Visual Video", mirror_webcam=False) | |
| text_output.render() | |
| demo.launch(debug=True) |