Spaces:
Running
on
Zero
Running
on
Zero
| from pathlib import Path | |
| import os | |
| from typing import Callable | |
| import tempfile | |
| import numpy as np | |
| import soundfile as sf | |
| from moviepy import VideoFileClip, AudioFileClip | |
| # from moviepy.audio.AudioClip import AudioArrayClip | |
| # from moviepy.audio.fx import AudioLoop | |
| import torch | |
| import torchvision | |
| def merge_audio_video( | |
| audio: str | Path | np.ndarray, | |
| video_path: str | Path, | |
| target_path: str | Path, | |
| backend: str = "moviepy", | |
| logging: bool = False, | |
| audio_fps: int | None = None | |
| ): | |
| """ | |
| Merge audio and video into a single file. | |
| Args: | |
| audio_path (str | Path): Path to the audio file. | |
| video_path (str | Path): Path to the video file. | |
| target_path (str | Path): Path to the target file. | |
| backend (str, optional): The backend to use for merging. Defaults to "moviepy". | |
| """ | |
| assert backend in [ | |
| "moviepy", "ffmpeg" | |
| ], "Backend should be moviepy or ffmpeg" | |
| if backend == "moviepy": | |
| video = VideoFileClip(video_path.__str__()) | |
| video = video.without_audio() | |
| if isinstance(audio, np.ndarray): | |
| assert audio_fps is not None | |
| # write to a temp file, then use AudioFileClip to load | |
| with tempfile.NamedTemporaryFile( | |
| suffix=".wav", delete=False | |
| ) as tmp_wav: | |
| sf.write(tmp_wav.name, audio, samplerate=audio_fps) | |
| audio = AudioFileClip(tmp_wav.name) | |
| else: | |
| audio = AudioFileClip(audio.__str__()) | |
| tmp_wav = None | |
| video = video.with_audio(audio) | |
| target_path = Path(target_path) | |
| video.write_videofile( | |
| target_path, | |
| logger=None if not logging else "bar", | |
| threads=8, | |
| preset="ultrafast", | |
| ffmpeg_params=["-crf", "23"] | |
| ) | |
| if tmp_wav: | |
| os.remove(tmp_wav.name) | |
| else: | |
| logging_arg = "" if logging else "-loglevel quiet" | |
| command = f"ffmpeg {logging_arg} -i '{video_path.__str__()}' -i '{audio.__str__()}' -c:v copy " \ | |
| f"-c:a copy -map 0:v:0 -map 1:a:0 '{target_path.__str__()}'" | |
| os.system(command) | |
| def read_video_frames( | |
| video_path: str, | |
| duration: float | None = 10.0, | |
| fps: int = 10, | |
| video_size: tuple[int] = (256, 256), | |
| resize_transform: Callable | None = None, | |
| ): | |
| try: | |
| video, _, meta = torchvision.io.read_video( | |
| str(video_path), start_pts=0, end_pts=duration, pts_unit='sec' | |
| ) | |
| video_duration = video.shape[0] / meta["video_fps"] | |
| if duration and video_duration < duration: | |
| num_frames, height, width, channels = video.shape | |
| padding_length = int(duration * meta["video_fps"]) - num_frames | |
| padding = torch.zeros((padding_length, height, width, channels), | |
| dtype=video.dtype) | |
| video = torch.cat([video, padding], dim=0) | |
| target_length = int(duration * fps) | |
| else: | |
| target_length = int(video_duration * fps) | |
| indices = torch.linspace(0, video.shape[0] - 1, | |
| steps=target_length).long() | |
| video = video[indices] | |
| video = video.permute(0, 3, 1, 2) # [T, C, H, W] | |
| if resize_transform is None: | |
| resize_transform = torchvision.transforms.Resize(video_size) | |
| video = resize_transform(video) | |
| return video | |
| except Exception as e: | |
| print(f"error reading video {video_path}: {e}") | |
| assert duration is not None | |
| target_length = int(duration * fps) | |
| return torch.zeros(target_length, 3, *video_size) | |