MH0386's picture
Upload folder using huggingface_hub
3e165b2 verified
import os
import shutil
import sys
import time
from importlib.util import find_spec
from pathlib import Path
from typing import Literal
import gradio as gr
import librosa
import numpy as np
import python_speech_features
import spaces
import torch
from gradio import Markdown
from moviepy.editor import (
AudioFileClip,
ImageClip,
VideoFileClip,
concatenate_videoclips,
)
from PIL import Image
from torch import Tensor
from torchvision.transforms import ToPILImage
from tqdm import tqdm
from visualizr import (
FRAMES_RESULT_SAVED_PATH,
MOTION_DIM,
RESULTS_DIR,
STAGE_1_CHECKPOINT_PATH,
TMP_MP4,
logger,
model_mapping,
)
from visualizr.config import TrainConfig
from visualizr.experiment import LitModel
from visualizr.LIA_Model import LIA_Model
from visualizr.templates import ffhq256_autoenc
def check_package_installed(package_name: str) -> bool:
return find_spec(package_name) is not None
def frames_to_video(input_path, audio_path, output_path, fps=25):
image_files = [
os.path.join(input_path, img) for img in sorted(os.listdir(input_path))
]
clips = [ImageClip(m).set_duration(1 / fps) for m in image_files]
video = concatenate_videoclips(clips, method="compose")
audio = AudioFileClip(audio_path)
final_video = video.set_audio(audio)
final_video.write_videofile(output_path, fps, "libx264", audio_codec="aac")
def load_image(filename: str, size: int) -> np.ndarray:
img: Image.Image = Image.open(filename).convert("RGB")
img_resized: Image.Image = img.resize((size, size))
img_np: np.ndarray = np.asarray(img_resized)
img_transposed: np.ndarray = np.transpose(img_np, (2, 0, 1)) # 3 x 256 x 256
return img_transposed / 255.0
def img_preprocessing(img_path: str, size: int) -> Tensor:
img_np: np.ndarray = load_image(img_path, size) # [0, 1]
img: Tensor = torch.from_numpy(img_np).unsqueeze(0).float() # [0, 1]
normalized_image: Tensor = (img - 0.5) * 2.0 # [-1, 1]
return normalized_image
def saved_image(img_tensor: Tensor, img_path: str) -> None:
pil_image_converter: ToPILImage = ToPILImage()
img = pil_image_converter(img_tensor.detach().cpu().squeeze(0))
img.save(img_path)
def load_stage_1_model() -> LIA_Model:
logger.info("Loading stage 1 model... ")
lia: LIA_Model = LIA_Model(motion_dim=MOTION_DIM, fusion_type="weighted_sum")
lia.load_lightning_model(STAGE_1_CHECKPOINT_PATH)
lia.to("cuda")
return lia
def load_stage_2_model(conf: TrainConfig, stage2_checkpoint_path: str) -> LitModel:
logger.info("Loading stage 2 model... ")
model = LitModel(conf)
state = torch.load(stage2_checkpoint_path, "cpu")
model.load_state_dict(state)
model.ema_model.eval()
model.ema_model.to("cuda")
return model
def init_conf(
infer_type: Literal[
"mfcc_full_control",
"mfcc_pose_only",
"hubert_pose_only",
"hubert_audio_only",
"hubert_full_control",
],
seed: int,
) -> TrainConfig:
logger.info("Initializing configuration... ")
conf: TrainConfig = ffhq256_autoenc()
conf.seed = seed
conf.decoder_layers = 2
conf.infer_type = infer_type
conf.motion_dim = MOTION_DIM
logger.info(f"infer_type: {infer_type}")
match infer_type:
case "mfcc_full_control":
conf.face_location = True
conf.face_scale = True
conf.mfcc = True
case "mfcc_pose_only":
conf.face_location = False
conf.face_scale = False
conf.mfcc = True
case "hubert_pose_only":
conf.face_location = False
conf.face_scale = False
conf.mfcc = False
case "hubert_audio_only":
conf.face_location = False
conf.face_scale = False
conf.mfcc = False
case "hubert_full_control":
conf.face_location = True
conf.face_scale = True
conf.mfcc = False
return conf
def main(
infer_type: Literal[
"mfcc_full_control",
"mfcc_pose_only",
"hubert_pose_only",
"hubert_audio_only",
"hubert_full_control",
],
image_path: str,
test_audio_path: str,
face_sr: bool,
pose_yaw: float,
pose_pitch: float,
pose_roll: float,
face_location: float,
face_scale: float,
step_t: int,
seed: int,
stage2_checkpoint_path: str,
):
global frame_end, audio_driven
if not os.path.exists(image_path):
logger.exception(f"{image_path} does not exist!")
sys.exit(0)
if not os.path.exists(test_audio_path):
logger.exception(f"{test_audio_path} does not exist!")
sys.exit(0)
image_name: str = Path(image_path).stem
audio_name: str = Path(test_audio_path).stem
predicted_video_256_path: Path = RESULTS_DIR / f"{image_name}-{audio_name}.mp4"
predicted_video_512_path: Path = RESULTS_DIR / f"{image_name}-{audio_name}_SR.mp4"
# ======Loading Stage 1 model=========
lia: LIA_Model = load_stage_1_model()
# ============================
conf: TrainConfig = init_conf(infer_type, seed)
img_source: Tensor = img_preprocessing(image_path, 256).to("cuda")
one_shot_lia_start, one_shot_lia_direction, feats = lia.get_start_direction_code(
img_source, img_source, img_source, img_source
)
# ======Loading Stage 2 model=========
model = load_stage_2_model(conf, stage2_checkpoint_path)
# =================================
# ======Audio Input=========
if conf.infer_type.startswith("mfcc"):
# MFCC features
wav, sr = librosa.load(test_audio_path, sr=16000)
input_values = python_speech_features.mfcc(
signal=wav, samplerate=sr, numcep=13, winlen=0.025, winstep=0.01
)
d_mfcc_feat = python_speech_features.base.delta(input_values, 1)
d_mfcc_feat2 = python_speech_features.base.delta(input_values, 2)
audio_driven_obj: np.ndarray = np.hstack(
(input_values, d_mfcc_feat, d_mfcc_feat2)
)
frame_start, frame_end = 0, int(audio_driven_obj.shape[0] / 4)
audio_start, audio_end = (
int(frame_start * 4),
int(frame_end * 4),
) # The video frame is fixed to 25 hz, and the audio is fixed to 100 hz
audio_driven = (
torch.Tensor(audio_driven_obj[audio_start:audio_end, :])
.unsqueeze(0)
.float()
.to("cuda")
)
elif conf.infer_type.startswith("hubert"):
# Hubert features
if not check_package_installed("transformers"):
logger.exception("Please install transformers module first.")
sys.exit(0)
hubert_model_path = "ckpts/chinese-hubert-large"
if not os.path.exists(hubert_model_path):
logger.exception(
"Please download the hubert weight into the ckpts path first."
)
sys.exit(0)
logger.info(
"You did not extract the audio features in advance, "
+ "extracting online now, which will increase processing delay"
)
start_time = time.time()
# load hubert model
from transformers import HubertModel, Wav2Vec2FeatureExtractor
audio_model = HubertModel.from_pretrained(hubert_model_path).to("cuda")
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(hubert_model_path)
audio_model.feature_extractor._freeze_parameters() # skipcq: PYL-W0212
audio_model.eval()
# hubert model forward pass
audio, sr = librosa.load(test_audio_path, sr=16000)
input_values = feature_extractor(
audio,
sampling_rate=16000,
padding=True,
do_normalize=True,
return_tensors="pt",
).input_values
input_values = input_values.to("cuda")
ws_feats = []
with torch.no_grad():
outputs = audio_model(input_values, output_hidden_states=True)
for i in range(len(outputs.hidden_states)):
ws_feats.append(outputs.hidden_states[i].detach().cpu().numpy())
ws_feat_obj = np.array(ws_feats)
ws_feat_obj = np.squeeze(ws_feat_obj, 1)
ws_feat_obj = np.pad(
ws_feat_obj, ((0, 0), (0, 1), (0, 0)), "edge"
) # align the audio length with the video frame
execution_time = time.time() - start_time
logger.info(f"Extraction Audio Feature: {execution_time:.2f} Seconds")
audio_driven_obj = ws_feat_obj
frame_start, frame_end = 0, int(audio_driven_obj.shape[1] / 2)
audio_start, audio_end = (
int(frame_start * 2),
int(frame_end * 2),
) # The video frame is fixed to 25 hz, and the audio is fixed to 50 hz
audio_driven = (
torch.Tensor(audio_driven_obj[:, audio_start:audio_end, :])
.unsqueeze(0)
.float()
.to("cuda")
)
# ============================
# Diffusion Noise
noisy_t = torch.randn((1, frame_end, MOTION_DIM)).to("cuda")
# ======Inputs for Attribute Control=========
yaw_signal = torch.zeros(1, frame_end, 1).to("cuda") + pose_yaw
pitch_signal = torch.zeros(1, frame_end, 1).to("cuda") + pose_pitch
roll_signal = torch.zeros(1, frame_end, 1).to("cuda") + pose_roll
pose_signal = torch.cat((yaw_signal, pitch_signal, roll_signal), dim=-1)
pose_signal = torch.clamp(pose_signal, -1, 1)
face_location_signal = torch.zeros(1, frame_end, 1).to("cuda") + face_location
face_scale_tensor = torch.zeros(1, frame_end, 1).to("cuda") + face_scale
# ===========================================
start_time = time.time()
# ======Diffusion De-nosing Process=========
generated_directions = model.render(
one_shot_lia_start,
one_shot_lia_direction,
audio_driven,
face_location_signal,
face_scale_tensor,
pose_signal,
noisy_t,
step_t,
True,
)
# =========================================
execution_time = time.time() - start_time
logger.info(f"Motion Diffusion Model: {execution_time:.2f} Seconds")
generated_directions = generated_directions.detach().cpu().numpy()
start_time = time.time()
# ======Rendering images frame-by-frame=========
for pred_index in tqdm(range(generated_directions.shape[1])):
ori_img_recon = lia.render(
one_shot_lia_start,
torch.Tensor(generated_directions[:, pred_index, :]).to("cuda"),
feats,
)
ori_img_recon = ori_img_recon.clamp(-1, 1)
wav_pred = (ori_img_recon.detach() + 1) / 2
saved_image(
wav_pred, os.path.join(FRAMES_RESULT_SAVED_PATH, f"{pred_index:06d}.png")
)
# ==============================================
execution_time = time.time() - start_time
logger.info(f"Renderer Model: {execution_time:.2f} Seconds")
logger.info(f"Saving video at {predicted_video_256_path}")
frames_to_video(
str(FRAMES_RESULT_SAVED_PATH),
test_audio_path,
str(predicted_video_256_path),
)
shutil.rmtree(FRAMES_RESULT_SAVED_PATH)
# Enhancer
if face_sr and check_package_installed("gfpgan"):
from imageio import mimsave
from visualizr.face_sr.face_enhancer import enhancer_list
# Super-resolution
mimsave(
predicted_video_512_path / TMP_MP4,
enhancer_list(predicted_video_256_path, bg_upsampler=None),
fps=25.0,
)
# Merge audio and video
video_clip = VideoFileClip(predicted_video_512_path / TMP_MP4)
audio_clip = AudioFileClip(predicted_video_256_path)
final_clip = video_clip.set_audio(audio_clip)
final_clip.write_videofile(
predicted_video_512_path, codec="libx264", audio_codec="aac"
)
os.remove(predicted_video_512_path / TMP_MP4)
if face_sr:
return predicted_video_256_path, predicted_video_512_path
return predicted_video_256_path, predicted_video_256_path
@spaces.GPU(duration=300)
def generate_video(
uploaded_img: str,
uploaded_audio: str,
infer_type: Literal[
"mfcc_full_control",
"mfcc_pose_only",
"hubert_pose_only",
"hubert_audio_only",
"hubert_full_control",
],
pose_yaw: float,
pose_pitch: float,
pose_roll: float,
face_location: float,
face_scale: float,
step_t: int,
face_sr: bool,
seed: int,
):
if not uploaded_img or not uploaded_audio:
return None, Markdown(
"Error: Input image or audio file is empty. "
+ "Please check and upload both files."
)
try:
output_256_video_path, output_512_video_path = main(
infer_type,
uploaded_img,
uploaded_audio,
face_sr,
pose_yaw,
pose_pitch,
pose_roll,
face_location,
face_scale,
step_t,
seed,
model_mapping.get(
infer_type,
"default_checkpoint.ckpt",
),
)
if not os.path.exists(output_256_video_path):
return None, gr.Markdown(
"Error: Video generation failed. "
+ "Please check your inputs and try again."
)
if output_256_video_path == output_512_video_path:
return (
gr.Video(value=output_256_video_path),
None,
gr.Markdown("Video (256*256 only) generated successfully!"),
)
return (
gr.Video(value=output_256_video_path),
gr.Video(value=output_512_video_path),
gr.Markdown("Video generated successfully!"),
)
except Exception as e:
return (
None,
None,
gr.Markdown(f"Error: An unexpected error occurred - {str(e)}"),
)