|
|
""" |
|
|
LDF Model for Hugging Face Hub |
|
|
|
|
|
Usage: |
|
|
from transformers import AutoModel |
|
|
|
|
|
model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True) |
|
|
motion = model("a person walking forward", length=60) |
|
|
""" |
|
|
|
|
|
import torch |
|
|
from transformers import PretrainedConfig, PreTrainedModel |
|
|
from typing import Union, List, Optional |
|
|
import os |
|
|
import sys |
|
|
|
|
|
|
|
|
class LDFConfig(PretrainedConfig): |
|
|
"""Configuration for LDF Motion Generation Model""" |
|
|
model_type = "ldf_motion" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
input_dim=4, |
|
|
output_dim=263, |
|
|
**kwargs |
|
|
): |
|
|
super().__init__(**kwargs) |
|
|
self.input_dim = input_dim |
|
|
self.output_dim = output_dim |
|
|
|
|
|
|
|
|
class LDFModel(PreTrainedModel): |
|
|
""" |
|
|
LDF Motion Generation Model |
|
|
|
|
|
This model generates motion sequences from text descriptions using Latent Diffusion Forcing. |
|
|
|
|
|
Example: |
|
|
>>> from transformers import AutoModel |
|
|
>>> model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True) |
|
|
>>> motion = model("a person walking forward", length=60) |
|
|
>>> print(motion.shape) # (~240, 263) |
|
|
""" |
|
|
|
|
|
config_class = LDFConfig |
|
|
|
|
|
def __init__(self, config): |
|
|
super().__init__(config) |
|
|
self.config = config |
|
|
|
|
|
|
|
|
self.ldf_model = None |
|
|
self.vae = None |
|
|
self.model_dir = None |
|
|
|
|
|
def _load_models(self): |
|
|
"""Load the actual LDF and VAE models""" |
|
|
if self.ldf_model is not None: |
|
|
return |
|
|
|
|
|
|
|
|
if hasattr(self, 'name_or_path') and os.path.exists(self.name_or_path): |
|
|
model_dir = self.name_or_path |
|
|
else: |
|
|
raise RuntimeError( |
|
|
"Model directory not found. Please use from_pretrained() to load the model." |
|
|
) |
|
|
|
|
|
|
|
|
self.model_dir = model_dir |
|
|
|
|
|
|
|
|
if model_dir not in sys.path: |
|
|
sys.path.insert(0, model_dir) |
|
|
|
|
|
|
|
|
import importlib |
|
|
generate_ldf = importlib.import_module('generate_ldf') |
|
|
load_model_from_config = generate_ldf.load_model_from_config |
|
|
|
|
|
config_path = os.path.join(model_dir, "ldf.yaml") |
|
|
old_argv = sys.argv |
|
|
sys.argv = ['model', '--config', config_path] |
|
|
|
|
|
try: |
|
|
self.vae, self.ldf_model = load_model_from_config() |
|
|
|
|
|
|
|
|
device = next(self.parameters()).device if list(self.parameters()) else torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
self.ldf_model = self.ldf_model.to(device) |
|
|
self.vae = self.vae.to(device) |
|
|
finally: |
|
|
sys.argv = old_argv |
|
|
|
|
|
@classmethod |
|
|
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): |
|
|
""" |
|
|
Load pretrained model |
|
|
|
|
|
Args: |
|
|
pretrained_model_name_or_path: Model name or path |
|
|
trust_remote_code: Must be True to load this custom model |
|
|
**kwargs: Additional arguments |
|
|
|
|
|
Returns: |
|
|
LDFModel instance |
|
|
""" |
|
|
|
|
|
if not kwargs.get('trust_remote_code', False): |
|
|
raise ValueError( |
|
|
"Loading this model requires trust_remote_code=True. " |
|
|
"Usage: AutoModel.from_pretrained(..., trust_remote_code=True)" |
|
|
) |
|
|
|
|
|
|
|
|
if not os.path.exists(pretrained_model_name_or_path): |
|
|
from huggingface_hub import snapshot_download |
|
|
model_path = snapshot_download(repo_id=pretrained_model_name_or_path) |
|
|
else: |
|
|
model_path = pretrained_model_name_or_path |
|
|
|
|
|
|
|
|
config = LDFConfig.from_pretrained(model_path) |
|
|
|
|
|
|
|
|
model = cls(config) |
|
|
model.name_or_path = model_path |
|
|
|
|
|
|
|
|
model._load_models() |
|
|
|
|
|
return model |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
text: Union[str, List[str], List[List[str]]], |
|
|
length: Union[int, List[int]] = 60, |
|
|
text_end: Optional[Union[List[int], List[List[int]]]] = None, |
|
|
num_denoise_steps: Optional[int] = None, |
|
|
**kwargs |
|
|
): |
|
|
""" |
|
|
Generate motion from text |
|
|
|
|
|
Args: |
|
|
text: Text description(s) |
|
|
length: Number of latent tokens (output frames ≈ length × 4) |
|
|
text_end: Transition points for multi-text |
|
|
num_denoise_steps: Number of denoising steps |
|
|
|
|
|
Returns: |
|
|
Generated motion sequence(s) |
|
|
""" |
|
|
return self.__call__(text, length, text_end, num_denoise_steps) |
|
|
|
|
|
@torch.no_grad() |
|
|
def __call__( |
|
|
self, |
|
|
text: Union[str, List[str], List[List[str]]], |
|
|
length: Union[int, List[int]] = 60, |
|
|
text_end: Optional[Union[List[int], List[List[int]]]] = None, |
|
|
num_denoise_steps: Optional[int] = None, |
|
|
output_joints: bool = False, |
|
|
smoothing_alpha: float = 1.0 |
|
|
): |
|
|
""" |
|
|
Generate motion sequences |
|
|
|
|
|
Args: |
|
|
text: Text description |
|
|
- Single string: "walk" -> single sample |
|
|
- String list: ["walk", "run"] -> batch |
|
|
- Nested list: [["walk", "turn"], ["run", "jump"]] -> multi-text per sample |
|
|
length: Number of latent tokens (frames ≈ length × 4) |
|
|
text_end: Token positions for text switching |
|
|
num_denoise_steps: Number of denoising steps |
|
|
output_joints: If True, output 22×3 joint coordinates; if False (default), output 263-dim HumanML3D features |
|
|
smoothing_alpha: EMA smoothing factor for joint positions (0.0-1.0, default=1.0 no smoothing) |
|
|
- Only used when output_joints=True |
|
|
- Recommended: 0.5 for smoother animations |
|
|
|
|
|
Returns: |
|
|
numpy.ndarray or list of arrays |
|
|
- If output_joints=False: shape (frames, 263) |
|
|
- If output_joints=True: shape (frames, 22, 3) |
|
|
""" |
|
|
|
|
|
self._load_models() |
|
|
|
|
|
|
|
|
is_single = not isinstance(length, list) |
|
|
if is_single: |
|
|
text_batch = [text] |
|
|
length_batch = [length] |
|
|
text_end_batch = [text_end] if text_end is not None else None |
|
|
else: |
|
|
text_batch = text |
|
|
length_batch = length |
|
|
text_end_batch = text_end |
|
|
|
|
|
|
|
|
if text_end_batch is not None: |
|
|
for i, (txt, te) in enumerate(zip(text_batch, text_end_batch)): |
|
|
if isinstance(txt, list) and te is not None: |
|
|
if len(txt) != len(te): |
|
|
raise ValueError( |
|
|
f"Batch {i}: text has {len(txt)} segments but text_end has {len(te)} endpoints. " |
|
|
f"They must match! text={txt}, text_end={te}" |
|
|
) |
|
|
|
|
|
batch_size = len(text_batch) |
|
|
|
|
|
|
|
|
x = {"feature_length": torch.tensor(length_batch), "text": text_batch} |
|
|
if text_end_batch is not None: |
|
|
x["feature_text_end"] = text_end_batch |
|
|
|
|
|
|
|
|
output = self.ldf_model.generate(x, num_denoise_steps=num_denoise_steps) |
|
|
generated_batch = output["generated"] |
|
|
|
|
|
|
|
|
decoded_results = [] |
|
|
joints_results = [] if output_joints else None |
|
|
|
|
|
|
|
|
if output_joints: |
|
|
import importlib.util |
|
|
import numpy as np |
|
|
utils_spec = importlib.util.spec_from_file_location( |
|
|
"motion_process", |
|
|
os.path.join(self.model_dir, "ldf_utils", "motion_process.py") |
|
|
) |
|
|
motion_process_module = importlib.util.module_from_spec(utils_spec) |
|
|
utils_spec.loader.exec_module(motion_process_module) |
|
|
|
|
|
for i, generated in enumerate(generated_batch): |
|
|
if generated is not None and torch.is_tensor(generated): |
|
|
|
|
|
decoded_g = self.vae.decode(generated[None, :])[0] |
|
|
|
|
|
if output_joints: |
|
|
|
|
|
|
|
|
decoded_np = decoded_g.cpu().numpy() |
|
|
recovery = motion_process_module.StreamJointRecovery263( |
|
|
joints_num=22, smoothing_alpha=smoothing_alpha |
|
|
) |
|
|
joints = [recovery.process_frame(frame) for frame in decoded_np] |
|
|
joints = np.array(joints) |
|
|
joints_results.append(joints) |
|
|
else: |
|
|
decoded_results.append(decoded_g.cpu().numpy()) |
|
|
else: |
|
|
if output_joints: |
|
|
joints_results.append(None) |
|
|
else: |
|
|
decoded_results.append(None) |
|
|
|
|
|
|
|
|
if output_joints: |
|
|
return joints_results[0] if is_single else joints_results |
|
|
else: |
|
|
return decoded_results[0] if is_single else decoded_results |
|
|
|
|
|
def generate(self, *args, **kwargs): |
|
|
"""Alias for __call__ to match transformers API""" |
|
|
return self.__call__(*args, **kwargs) |
|
|
|
|
|
|
|
|
|
|
|
LDFPipeline = LDFModel |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from transformers import AutoModel, AutoConfig |
|
|
AutoConfig.register("ldf_motion", LDFConfig) |
|
|
AutoModel.register(LDFConfig, LDFModel) |
|
|
except: |
|
|
pass |
|
|
|