| | import os |
| | import cv2 |
| | import yaml |
| | import numpy as np |
| | import warnings |
| | from skimage import img_as_ubyte |
| | import safetensors |
| | import safetensors.torch |
| | warnings.filterwarnings('ignore') |
| |
|
| |
|
| | import imageio |
| | import torch |
| | import torchvision |
| |
|
| |
|
| | from src.facerender.modules.keypoint_detector import HEEstimator, KPDetector |
| | from src.facerender.modules.mapping import MappingNet |
| | from src.facerender.modules.generator import OcclusionAwareGenerator, OcclusionAwareSPADEGenerator |
| | from src.facerender.modules.make_animation import make_animation |
| |
|
| | from pydub import AudioSegment |
| | from src.utils.face_enhancer import enhancer_generator_with_len, enhancer_list |
| | from src.utils.paste_pic import paste_pic |
| | from src.utils.videoio import save_video_with_watermark |
| |
|
| | try: |
| | import webui |
| | in_webui = True |
| | except: |
| | in_webui = False |
| |
|
| | class AnimateFromCoeff(): |
| |
|
| | def __init__(self, sadtalker_path, device): |
| |
|
| | with open(sadtalker_path['facerender_yaml']) as f: |
| | config = yaml.safe_load(f) |
| |
|
| | generator = OcclusionAwareSPADEGenerator(**config['model_params']['generator_params'], |
| | **config['model_params']['common_params']) |
| | kp_extractor = KPDetector(**config['model_params']['kp_detector_params'], |
| | **config['model_params']['common_params']) |
| | he_estimator = HEEstimator(**config['model_params']['he_estimator_params'], |
| | **config['model_params']['common_params']) |
| | mapping = MappingNet(**config['model_params']['mapping_params']) |
| |
|
| | generator.to(device) |
| | kp_extractor.to(device) |
| | he_estimator.to(device) |
| | mapping.to(device) |
| | for param in generator.parameters(): |
| | param.requires_grad = False |
| | for param in kp_extractor.parameters(): |
| | param.requires_grad = False |
| | for param in he_estimator.parameters(): |
| | param.requires_grad = False |
| | for param in mapping.parameters(): |
| | param.requires_grad = False |
| |
|
| | if sadtalker_path is not None: |
| | if 'checkpoint' in sadtalker_path: |
| | self.load_cpk_facevid2vid_safetensor(sadtalker_path['checkpoint'], kp_detector=kp_extractor, generator=generator, he_estimator=None) |
| | else: |
| | self.load_cpk_facevid2vid(sadtalker_path['free_view_checkpoint'], kp_detector=kp_extractor, generator=generator, he_estimator=he_estimator) |
| | else: |
| | raise AttributeError("Checkpoint should be specified for video head pose estimator.") |
| |
|
| | if sadtalker_path['mappingnet_checkpoint'] is not None: |
| | self.load_cpk_mapping(sadtalker_path['mappingnet_checkpoint'], mapping=mapping) |
| | else: |
| | raise AttributeError("Checkpoint should be specified for video head pose estimator.") |
| |
|
| | self.kp_extractor = kp_extractor |
| | self.generator = generator |
| | self.he_estimator = he_estimator |
| | self.mapping = mapping |
| |
|
| | self.kp_extractor.eval() |
| | self.generator.eval() |
| | self.he_estimator.eval() |
| | self.mapping.eval() |
| | |
| | self.device = device |
| | |
| | def load_cpk_facevid2vid_safetensor(self, checkpoint_path, generator=None, |
| | kp_detector=None, he_estimator=None, |
| | device="cpu"): |
| |
|
| | checkpoint = safetensors.torch.load_file(checkpoint_path) |
| |
|
| | if generator is not None: |
| | x_generator = {} |
| | for k,v in checkpoint.items(): |
| | if 'generator' in k: |
| | x_generator[k.replace('generator.', '')] = v |
| | generator.load_state_dict(x_generator) |
| | if kp_detector is not None: |
| | x_generator = {} |
| | for k,v in checkpoint.items(): |
| | if 'kp_extractor' in k: |
| | x_generator[k.replace('kp_extractor.', '')] = v |
| | kp_detector.load_state_dict(x_generator) |
| | if he_estimator is not None: |
| | x_generator = {} |
| | for k,v in checkpoint.items(): |
| | if 'he_estimator' in k: |
| | x_generator[k.replace('he_estimator.', '')] = v |
| | he_estimator.load_state_dict(x_generator) |
| | |
| | return None |
| |
|
| | def load_cpk_facevid2vid(self, checkpoint_path, generator=None, discriminator=None, |
| | kp_detector=None, he_estimator=None, optimizer_generator=None, |
| | optimizer_discriminator=None, optimizer_kp_detector=None, |
| | optimizer_he_estimator=None, device="cpu"): |
| | checkpoint = torch.load(checkpoint_path, map_location=torch.device(device)) |
| | if generator is not None: |
| | generator.load_state_dict(checkpoint['generator']) |
| | if kp_detector is not None: |
| | kp_detector.load_state_dict(checkpoint['kp_detector']) |
| | if he_estimator is not None: |
| | he_estimator.load_state_dict(checkpoint['he_estimator']) |
| | if discriminator is not None: |
| | try: |
| | discriminator.load_state_dict(checkpoint['discriminator']) |
| | except: |
| | print ('No discriminator in the state-dict. Dicriminator will be randomly initialized') |
| | if optimizer_generator is not None: |
| | optimizer_generator.load_state_dict(checkpoint['optimizer_generator']) |
| | if optimizer_discriminator is not None: |
| | try: |
| | optimizer_discriminator.load_state_dict(checkpoint['optimizer_discriminator']) |
| | except RuntimeError as e: |
| | print ('No discriminator optimizer in the state-dict. Optimizer will be not initialized') |
| | if optimizer_kp_detector is not None: |
| | optimizer_kp_detector.load_state_dict(checkpoint['optimizer_kp_detector']) |
| | if optimizer_he_estimator is not None: |
| | optimizer_he_estimator.load_state_dict(checkpoint['optimizer_he_estimator']) |
| |
|
| | return checkpoint['epoch'] |
| | |
| | def load_cpk_mapping(self, checkpoint_path, mapping=None, discriminator=None, |
| | optimizer_mapping=None, optimizer_discriminator=None, device='cpu'): |
| | checkpoint = torch.load(checkpoint_path, map_location=torch.device(device)) |
| | if mapping is not None: |
| | mapping.load_state_dict(checkpoint['mapping']) |
| | if discriminator is not None: |
| | discriminator.load_state_dict(checkpoint['discriminator']) |
| | if optimizer_mapping is not None: |
| | optimizer_mapping.load_state_dict(checkpoint['optimizer_mapping']) |
| | if optimizer_discriminator is not None: |
| | optimizer_discriminator.load_state_dict(checkpoint['optimizer_discriminator']) |
| |
|
| | return checkpoint['epoch'] |
| |
|
| | def generate(self, x, video_save_dir, pic_path, crop_info, enhancer=None, background_enhancer=None, preprocess='crop', img_size=256): |
| |
|
| | source_image=x['source_image'].type(torch.FloatTensor) |
| | source_semantics=x['source_semantics'].type(torch.FloatTensor) |
| | target_semantics=x['target_semantics_list'].type(torch.FloatTensor) |
| | source_image=source_image.to(self.device) |
| | source_semantics=source_semantics.to(self.device) |
| | target_semantics=target_semantics.to(self.device) |
| | if 'yaw_c_seq' in x: |
| | yaw_c_seq = x['yaw_c_seq'].type(torch.FloatTensor) |
| | yaw_c_seq = x['yaw_c_seq'].to(self.device) |
| | else: |
| | yaw_c_seq = None |
| | if 'pitch_c_seq' in x: |
| | pitch_c_seq = x['pitch_c_seq'].type(torch.FloatTensor) |
| | pitch_c_seq = x['pitch_c_seq'].to(self.device) |
| | else: |
| | pitch_c_seq = None |
| | if 'roll_c_seq' in x: |
| | roll_c_seq = x['roll_c_seq'].type(torch.FloatTensor) |
| | roll_c_seq = x['roll_c_seq'].to(self.device) |
| | else: |
| | roll_c_seq = None |
| |
|
| | frame_num = x['frame_num'] |
| |
|
| | predictions_video = make_animation(source_image, source_semantics, target_semantics, |
| | self.generator, self.kp_extractor, self.he_estimator, self.mapping, |
| | yaw_c_seq, pitch_c_seq, roll_c_seq, use_exp = True) |
| |
|
| | predictions_video = predictions_video.reshape((-1,)+predictions_video.shape[2:]) |
| | predictions_video = predictions_video[:frame_num] |
| |
|
| | video = [] |
| | for idx in range(predictions_video.shape[0]): |
| | image = predictions_video[idx] |
| | image = np.transpose(image.data.cpu().numpy(), [1, 2, 0]).astype(np.float32) |
| | video.append(image) |
| | result = img_as_ubyte(video) |
| |
|
| | |
| | original_size = crop_info[0] |
| | if original_size: |
| | result = [ cv2.resize(result_i,(img_size, int(img_size * original_size[1]/original_size[0]) )) for result_i in result ] |
| | |
| | video_name = x['video_name'] + '.mp4' |
| | path = os.path.join(video_save_dir, 'temp_'+video_name) |
| | |
| | imageio.mimsave(path, result, fps=float(25)) |
| |
|
| | av_path = os.path.join(video_save_dir, video_name) |
| | return_path = av_path |
| | |
| | audio_path = x['audio_path'] |
| | audio_name = os.path.splitext(os.path.split(audio_path)[-1])[0] |
| | new_audio_path = os.path.join(video_save_dir, audio_name+'.wav') |
| | start_time = 0 |
| | |
| | sound = AudioSegment.from_file(audio_path) |
| | frames = frame_num |
| | end_time = start_time + frames*1/25*1000 |
| | word1=sound.set_frame_rate(16000) |
| | word = word1[start_time:end_time] |
| | word.export(new_audio_path, format="wav") |
| |
|
| | save_video_with_watermark(path, new_audio_path, av_path, watermark= False) |
| | print(f'The generated video is named {video_save_dir}/{video_name}') |
| |
|
| | if 'full' in preprocess.lower(): |
| | |
| | video_name_full = x['video_name'] + '_full.mp4' |
| | full_video_path = os.path.join(video_save_dir, video_name_full) |
| | return_path = full_video_path |
| | paste_pic(path, pic_path, crop_info, new_audio_path, full_video_path, extended_crop= True if 'ext' in preprocess.lower() else False) |
| | print(f'The generated video is named {video_save_dir}/{video_name_full}') |
| | else: |
| | full_video_path = av_path |
| |
|
| | |
| | if enhancer: |
| | video_name_enhancer = x['video_name'] + '_enhanced.mp4' |
| | enhanced_path = os.path.join(video_save_dir, 'temp_'+video_name_enhancer) |
| | av_path_enhancer = os.path.join(video_save_dir, video_name_enhancer) |
| | return_path = av_path_enhancer |
| |
|
| | try: |
| | enhanced_images_gen_with_len = enhancer_generator_with_len(full_video_path, method=enhancer, bg_upsampler=background_enhancer) |
| | imageio.mimsave(enhanced_path, enhanced_images_gen_with_len, fps=float(25)) |
| | except: |
| | enhanced_images_gen_with_len = enhancer_list(full_video_path, method=enhancer, bg_upsampler=background_enhancer) |
| | imageio.mimsave(enhanced_path, enhanced_images_gen_with_len, fps=float(25)) |
| | |
| | save_video_with_watermark(enhanced_path, new_audio_path, av_path_enhancer, watermark= False) |
| | print(f'The generated video is named {video_save_dir}/{video_name_enhancer}') |
| | os.remove(enhanced_path) |
| |
|
| | os.remove(path) |
| | os.remove(new_audio_path) |
| |
|
| | return return_path |
| |
|
| |
|