""" LDF Model for Hugging Face Hub Usage: from transformers import AutoModel model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True) motion = model("a person walking forward", length=60) """ import torch from transformers import PretrainedConfig, PreTrainedModel from typing import Union, List, Optional import os import sys class LDFConfig(PretrainedConfig): """Configuration for LDF Motion Generation Model""" model_type = "ldf_motion" def __init__( self, input_dim=4, output_dim=263, **kwargs ): super().__init__(**kwargs) self.input_dim = input_dim self.output_dim = output_dim class LDFModel(PreTrainedModel): """ LDF Motion Generation Model This model generates motion sequences from text descriptions using Latent Diffusion Forcing. Example: >>> from transformers import AutoModel >>> model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True) >>> motion = model("a person walking forward", length=60) >>> print(motion.shape) # (~240, 263) """ config_class = LDFConfig def __init__(self, config): super().__init__(config) self.config = config # Will be loaded in from_pretrained self.ldf_model = None self.vae = None self.model_dir = None # Store model directory for later use def _load_models(self): """Load the actual LDF and VAE models""" if self.ldf_model is not None: return # Already loaded # Get the model directory - should be set by from_pretrained if hasattr(self, 'name_or_path') and os.path.exists(self.name_or_path): model_dir = self.name_or_path else: raise RuntimeError( "Model directory not found. Please use from_pretrained() to load the model." ) # Save model_dir for later use (e.g., in output_joints conversion) self.model_dir = model_dir # Add model_dir to sys.path for imports if model_dir not in sys.path: sys.path.insert(0, model_dir) # Use dynamic import to avoid HF's static import checker import importlib generate_ldf = importlib.import_module('generate_ldf') load_model_from_config = generate_ldf.load_model_from_config config_path = os.path.join(model_dir, "ldf.yaml") old_argv = sys.argv sys.argv = ['model', '--config', config_path] try: self.vae, self.ldf_model = load_model_from_config() # Move to correct device device = next(self.parameters()).device if list(self.parameters()) else torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.ldf_model = self.ldf_model.to(device) self.vae = self.vae.to(device) finally: sys.argv = old_argv @classmethod def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): """ Load pretrained model Args: pretrained_model_name_or_path: Model name or path trust_remote_code: Must be True to load this custom model **kwargs: Additional arguments Returns: LDFModel instance """ # Check trust_remote_code if not kwargs.get('trust_remote_code', False): raise ValueError( "Loading this model requires trust_remote_code=True. " "Usage: AutoModel.from_pretrained(..., trust_remote_code=True)" ) # Download if needed if not os.path.exists(pretrained_model_name_or_path): from huggingface_hub import snapshot_download model_path = snapshot_download(repo_id=pretrained_model_name_or_path) else: model_path = pretrained_model_name_or_path # Load config config = LDFConfig.from_pretrained(model_path) # Create model model = cls(config) model.name_or_path = model_path # Load the actual models model._load_models() return model def forward( self, text: Union[str, List[str], List[List[str]]], length: Union[int, List[int]] = 60, text_end: Optional[Union[List[int], List[List[int]]]] = None, num_denoise_steps: Optional[int] = None, **kwargs ): """ Generate motion from text Args: text: Text description(s) length: Number of latent tokens (output frames ≈ length × 4) text_end: Transition points for multi-text num_denoise_steps: Number of denoising steps Returns: Generated motion sequence(s) """ return self.__call__(text, length, text_end, num_denoise_steps) @torch.no_grad() def __call__( self, text: Union[str, List[str], List[List[str]]], length: Union[int, List[int]] = 60, text_end: Optional[Union[List[int], List[List[int]]]] = None, num_denoise_steps: Optional[int] = None, output_joints: bool = False, smoothing_alpha: float = 1.0 ): """ Generate motion sequences Args: text: Text description - Single string: "walk" -> single sample - String list: ["walk", "run"] -> batch - Nested list: [["walk", "turn"], ["run", "jump"]] -> multi-text per sample length: Number of latent tokens (frames ≈ length × 4) text_end: Token positions for text switching num_denoise_steps: Number of denoising steps output_joints: If True, output 22×3 joint coordinates; if False (default), output 263-dim HumanML3D features smoothing_alpha: EMA smoothing factor for joint positions (0.0-1.0, default=1.0 no smoothing) - Only used when output_joints=True - Recommended: 0.5 for smoother animations Returns: numpy.ndarray or list of arrays - If output_joints=False: shape (frames, 263) - If output_joints=True: shape (frames, 22, 3) """ # Ensure models are loaded self._load_models() # Normalize inputs is_single = not isinstance(length, list) if is_single: text_batch = [text] length_batch = [length] text_end_batch = [text_end] if text_end is not None else None else: text_batch = text length_batch = length text_end_batch = text_end # Validate text_end alignment with text if text_end_batch is not None: for i, (txt, te) in enumerate(zip(text_batch, text_end_batch)): if isinstance(txt, list) and te is not None: if len(txt) != len(te): raise ValueError( f"Batch {i}: text has {len(txt)} segments but text_end has {len(te)} endpoints. " f"They must match! text={txt}, text_end={te}" ) batch_size = len(text_batch) # Construct input dict for model x = {"feature_length": torch.tensor(length_batch), "text": text_batch} if text_end_batch is not None: x["feature_text_end"] = text_end_batch # Non-streaming generate (following generate_ldf.py 125-139) output = self.ldf_model.generate(x, num_denoise_steps=num_denoise_steps) generated_batch = output["generated"] # Decode with VAE and optionally convert to joints decoded_results = [] joints_results = [] if output_joints else None # Import motion processing module once if needed if output_joints: import importlib.util import numpy as np utils_spec = importlib.util.spec_from_file_location( "motion_process", os.path.join(self.model_dir, "ldf_utils", "motion_process.py") ) motion_process_module = importlib.util.module_from_spec(utils_spec) utils_spec.loader.exec_module(motion_process_module) for i, generated in enumerate(generated_batch): if generated is not None and torch.is_tensor(generated): # Decode with VAE (following generate_ldf.py line 130) decoded_g = self.vae.decode(generated[None, :])[0] if output_joints: # Convert to joints using StreamJointRecovery263 with smoothing # Create a new recovery instance for each sample to maintain independent state decoded_np = decoded_g.cpu().numpy() recovery = motion_process_module.StreamJointRecovery263( joints_num=22, smoothing_alpha=smoothing_alpha ) joints = [recovery.process_frame(frame) for frame in decoded_np] joints = np.array(joints) joints_results.append(joints) else: decoded_results.append(decoded_g.cpu().numpy()) else: if output_joints: joints_results.append(None) else: decoded_results.append(None) # Return results if output_joints: return joints_results[0] if is_single else joints_results else: return decoded_results[0] if is_single else decoded_results def generate(self, *args, **kwargs): """Alias for __call__ to match transformers API""" return self.__call__(*args, **kwargs) # For backwards compatibility LDFPipeline = LDFModel # Register with AutoModel try: from transformers import AutoModel, AutoConfig AutoConfig.register("ldf_motion", LDFConfig) AutoModel.register(LDFConfig, LDFModel) except: pass