FloodDiffusion / hf_pipeline.py
herrscher0's picture
Add EMA smoothing support for joint positions
82d5f99
"""
LDF Model for Hugging Face Hub
Usage:
from transformers import AutoModel
model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True)
motion = model("a person walking forward", length=60)
"""
import torch
from transformers import PretrainedConfig, PreTrainedModel
from typing import Union, List, Optional
import os
import sys
class LDFConfig(PretrainedConfig):
"""Configuration for LDF Motion Generation Model"""
model_type = "ldf_motion"
def __init__(
self,
input_dim=4,
output_dim=263,
**kwargs
):
super().__init__(**kwargs)
self.input_dim = input_dim
self.output_dim = output_dim
class LDFModel(PreTrainedModel):
"""
LDF Motion Generation Model
This model generates motion sequences from text descriptions using Latent Diffusion Forcing.
Example:
>>> from transformers import AutoModel
>>> model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True)
>>> motion = model("a person walking forward", length=60)
>>> print(motion.shape) # (~240, 263)
"""
config_class = LDFConfig
def __init__(self, config):
super().__init__(config)
self.config = config
# Will be loaded in from_pretrained
self.ldf_model = None
self.vae = None
self.model_dir = None # Store model directory for later use
def _load_models(self):
"""Load the actual LDF and VAE models"""
if self.ldf_model is not None:
return # Already loaded
# Get the model directory - should be set by from_pretrained
if hasattr(self, 'name_or_path') and os.path.exists(self.name_or_path):
model_dir = self.name_or_path
else:
raise RuntimeError(
"Model directory not found. Please use from_pretrained() to load the model."
)
# Save model_dir for later use (e.g., in output_joints conversion)
self.model_dir = model_dir
# Add model_dir to sys.path for imports
if model_dir not in sys.path:
sys.path.insert(0, model_dir)
# Use dynamic import to avoid HF's static import checker
import importlib
generate_ldf = importlib.import_module('generate_ldf')
load_model_from_config = generate_ldf.load_model_from_config
config_path = os.path.join(model_dir, "ldf.yaml")
old_argv = sys.argv
sys.argv = ['model', '--config', config_path]
try:
self.vae, self.ldf_model = load_model_from_config()
# Move to correct device
device = next(self.parameters()).device if list(self.parameters()) else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.ldf_model = self.ldf_model.to(device)
self.vae = self.vae.to(device)
finally:
sys.argv = old_argv
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
"""
Load pretrained model
Args:
pretrained_model_name_or_path: Model name or path
trust_remote_code: Must be True to load this custom model
**kwargs: Additional arguments
Returns:
LDFModel instance
"""
# Check trust_remote_code
if not kwargs.get('trust_remote_code', False):
raise ValueError(
"Loading this model requires trust_remote_code=True. "
"Usage: AutoModel.from_pretrained(..., trust_remote_code=True)"
)
# Download if needed
if not os.path.exists(pretrained_model_name_or_path):
from huggingface_hub import snapshot_download
model_path = snapshot_download(repo_id=pretrained_model_name_or_path)
else:
model_path = pretrained_model_name_or_path
# Load config
config = LDFConfig.from_pretrained(model_path)
# Create model
model = cls(config)
model.name_or_path = model_path
# Load the actual models
model._load_models()
return model
def forward(
self,
text: Union[str, List[str], List[List[str]]],
length: Union[int, List[int]] = 60,
text_end: Optional[Union[List[int], List[List[int]]]] = None,
num_denoise_steps: Optional[int] = None,
**kwargs
):
"""
Generate motion from text
Args:
text: Text description(s)
length: Number of latent tokens (output frames ≈ length × 4)
text_end: Transition points for multi-text
num_denoise_steps: Number of denoising steps
Returns:
Generated motion sequence(s)
"""
return self.__call__(text, length, text_end, num_denoise_steps)
@torch.no_grad()
def __call__(
self,
text: Union[str, List[str], List[List[str]]],
length: Union[int, List[int]] = 60,
text_end: Optional[Union[List[int], List[List[int]]]] = None,
num_denoise_steps: Optional[int] = None,
output_joints: bool = False,
smoothing_alpha: float = 1.0
):
"""
Generate motion sequences
Args:
text: Text description
- Single string: "walk" -> single sample
- String list: ["walk", "run"] -> batch
- Nested list: [["walk", "turn"], ["run", "jump"]] -> multi-text per sample
length: Number of latent tokens (frames ≈ length × 4)
text_end: Token positions for text switching
num_denoise_steps: Number of denoising steps
output_joints: If True, output 22×3 joint coordinates; if False (default), output 263-dim HumanML3D features
smoothing_alpha: EMA smoothing factor for joint positions (0.0-1.0, default=1.0 no smoothing)
- Only used when output_joints=True
- Recommended: 0.5 for smoother animations
Returns:
numpy.ndarray or list of arrays
- If output_joints=False: shape (frames, 263)
- If output_joints=True: shape (frames, 22, 3)
"""
# Ensure models are loaded
self._load_models()
# Normalize inputs
is_single = not isinstance(length, list)
if is_single:
text_batch = [text]
length_batch = [length]
text_end_batch = [text_end] if text_end is not None else None
else:
text_batch = text
length_batch = length
text_end_batch = text_end
# Validate text_end alignment with text
if text_end_batch is not None:
for i, (txt, te) in enumerate(zip(text_batch, text_end_batch)):
if isinstance(txt, list) and te is not None:
if len(txt) != len(te):
raise ValueError(
f"Batch {i}: text has {len(txt)} segments but text_end has {len(te)} endpoints. "
f"They must match! text={txt}, text_end={te}"
)
batch_size = len(text_batch)
# Construct input dict for model
x = {"feature_length": torch.tensor(length_batch), "text": text_batch}
if text_end_batch is not None:
x["feature_text_end"] = text_end_batch
# Non-streaming generate (following generate_ldf.py 125-139)
output = self.ldf_model.generate(x, num_denoise_steps=num_denoise_steps)
generated_batch = output["generated"]
# Decode with VAE and optionally convert to joints
decoded_results = []
joints_results = [] if output_joints else None
# Import motion processing module once if needed
if output_joints:
import importlib.util
import numpy as np
utils_spec = importlib.util.spec_from_file_location(
"motion_process",
os.path.join(self.model_dir, "ldf_utils", "motion_process.py")
)
motion_process_module = importlib.util.module_from_spec(utils_spec)
utils_spec.loader.exec_module(motion_process_module)
for i, generated in enumerate(generated_batch):
if generated is not None and torch.is_tensor(generated):
# Decode with VAE (following generate_ldf.py line 130)
decoded_g = self.vae.decode(generated[None, :])[0]
if output_joints:
# Convert to joints using StreamJointRecovery263 with smoothing
# Create a new recovery instance for each sample to maintain independent state
decoded_np = decoded_g.cpu().numpy()
recovery = motion_process_module.StreamJointRecovery263(
joints_num=22, smoothing_alpha=smoothing_alpha
)
joints = [recovery.process_frame(frame) for frame in decoded_np]
joints = np.array(joints)
joints_results.append(joints)
else:
decoded_results.append(decoded_g.cpu().numpy())
else:
if output_joints:
joints_results.append(None)
else:
decoded_results.append(None)
# Return results
if output_joints:
return joints_results[0] if is_single else joints_results
else:
return decoded_results[0] if is_single else decoded_results
def generate(self, *args, **kwargs):
"""Alias for __call__ to match transformers API"""
return self.__call__(*args, **kwargs)
# For backwards compatibility
LDFPipeline = LDFModel
# Register with AutoModel
try:
from transformers import AutoModel, AutoConfig
AutoConfig.register("ldf_motion", LDFConfig)
AutoModel.register(LDFConfig, LDFModel)
except:
pass