File size: 10,328 Bytes
ebc7f2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82d5f99
 
ebc7f2e
 
 
 
 
 
 
 
 
 
 
 
 
82d5f99
 
 
ebc7f2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82d5f99
 
 
 
 
 
 
 
 
 
 
ebc7f2e
 
 
 
 
 
82d5f99
 
ebc7f2e
82d5f99
 
ebc7f2e
82d5f99
 
ebc7f2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""
LDF Model for Hugging Face Hub

Usage:
    from transformers import AutoModel
    
    model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True)
    motion = model("a person walking forward", length=60)
"""

import torch
from transformers import PretrainedConfig, PreTrainedModel
from typing import Union, List, Optional
import os
import sys


class LDFConfig(PretrainedConfig):
    """Configuration for LDF Motion Generation Model"""
    model_type = "ldf_motion"
    
    def __init__(
        self,
        input_dim=4,
        output_dim=263,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.input_dim = input_dim
        self.output_dim = output_dim


class LDFModel(PreTrainedModel):
    """
    LDF Motion Generation Model
    
    This model generates motion sequences from text descriptions using Latent Diffusion Forcing.
    
    Example:
        >>> from transformers import AutoModel
        >>> model = AutoModel.from_pretrained("ShandaAI/FloodDiffusion", trust_remote_code=True)
        >>> motion = model("a person walking forward", length=60)
        >>> print(motion.shape)  # (~240, 263)
    """
    
    config_class = LDFConfig
    
    def __init__(self, config):
        super().__init__(config)
        self.config = config
        
        # Will be loaded in from_pretrained
        self.ldf_model = None
        self.vae = None
        self.model_dir = None  # Store model directory for later use
    
    def _load_models(self):
        """Load the actual LDF and VAE models"""
        if self.ldf_model is not None:
            return  # Already loaded
        
        # Get the model directory - should be set by from_pretrained
        if hasattr(self, 'name_or_path') and os.path.exists(self.name_or_path):
            model_dir = self.name_or_path
        else:
            raise RuntimeError(
                "Model directory not found. Please use from_pretrained() to load the model."
            )
        
        # Save model_dir for later use (e.g., in output_joints conversion)
        self.model_dir = model_dir
        
        # Add model_dir to sys.path for imports
        if model_dir not in sys.path:
            sys.path.insert(0, model_dir)
        
        # Use dynamic import to avoid HF's static import checker
        import importlib
        generate_ldf = importlib.import_module('generate_ldf')
        load_model_from_config = generate_ldf.load_model_from_config
        
        config_path = os.path.join(model_dir, "ldf.yaml")
        old_argv = sys.argv
        sys.argv = ['model', '--config', config_path]
        
        try:
            self.vae, self.ldf_model = load_model_from_config()
            
            # Move to correct device
            device = next(self.parameters()).device if list(self.parameters()) else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
            self.ldf_model = self.ldf_model.to(device)
            self.vae = self.vae.to(device)
        finally:
            sys.argv = old_argv
    
    @classmethod
    def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
        """
        Load pretrained model
        
        Args:
            pretrained_model_name_or_path: Model name or path
            trust_remote_code: Must be True to load this custom model
            **kwargs: Additional arguments
        
        Returns:
            LDFModel instance
        """
        # Check trust_remote_code
        if not kwargs.get('trust_remote_code', False):
            raise ValueError(
                "Loading this model requires trust_remote_code=True. "
                "Usage: AutoModel.from_pretrained(..., trust_remote_code=True)"
            )
        
        # Download if needed
        if not os.path.exists(pretrained_model_name_or_path):
            from huggingface_hub import snapshot_download
            model_path = snapshot_download(repo_id=pretrained_model_name_or_path)
        else:
            model_path = pretrained_model_name_or_path
        
        # Load config
        config = LDFConfig.from_pretrained(model_path)
        
        # Create model
        model = cls(config)
        model.name_or_path = model_path
        
        # Load the actual models
        model._load_models()
        
        return model
    
    def forward(
        self,
        text: Union[str, List[str], List[List[str]]],
        length: Union[int, List[int]] = 60,
        text_end: Optional[Union[List[int], List[List[int]]]] = None,
        num_denoise_steps: Optional[int] = None,
        **kwargs
    ):
        """
        Generate motion from text
        
        Args:
            text: Text description(s)
            length: Number of latent tokens (output frames ≈ length × 4)
            text_end: Transition points for multi-text
            num_denoise_steps: Number of denoising steps
        
        Returns:
            Generated motion sequence(s)
        """
        return self.__call__(text, length, text_end, num_denoise_steps)
    
    @torch.no_grad()
    def __call__(
        self,
        text: Union[str, List[str], List[List[str]]],
        length: Union[int, List[int]] = 60,
        text_end: Optional[Union[List[int], List[List[int]]]] = None,
        num_denoise_steps: Optional[int] = None,
        output_joints: bool = False,
        smoothing_alpha: float = 1.0
    ):
        """
        Generate motion sequences
        
        Args:
            text: Text description
                - Single string: "walk" -> single sample
                - String list: ["walk", "run"] -> batch
                - Nested list: [["walk", "turn"], ["run", "jump"]] -> multi-text per sample
            length: Number of latent tokens (frames ≈ length × 4)
            text_end: Token positions for text switching
            num_denoise_steps: Number of denoising steps
            output_joints: If True, output 22×3 joint coordinates; if False (default), output 263-dim HumanML3D features
            smoothing_alpha: EMA smoothing factor for joint positions (0.0-1.0, default=1.0 no smoothing)
                - Only used when output_joints=True
                - Recommended: 0.5 for smoother animations
        
        Returns:
            numpy.ndarray or list of arrays
                - If output_joints=False: shape (frames, 263)
                - If output_joints=True: shape (frames, 22, 3)
        """
        # Ensure models are loaded
        self._load_models()
        
        # Normalize inputs
        is_single = not isinstance(length, list)
        if is_single:
            text_batch = [text]
            length_batch = [length]
            text_end_batch = [text_end] if text_end is not None else None
        else:
            text_batch = text
            length_batch = length
            text_end_batch = text_end
        
        # Validate text_end alignment with text
        if text_end_batch is not None:
            for i, (txt, te) in enumerate(zip(text_batch, text_end_batch)):
                if isinstance(txt, list) and te is not None:
                    if len(txt) != len(te):
                        raise ValueError(
                            f"Batch {i}: text has {len(txt)} segments but text_end has {len(te)} endpoints. "
                            f"They must match! text={txt}, text_end={te}"
                        )
        
        batch_size = len(text_batch)
        
        # Construct input dict for model
        x = {"feature_length": torch.tensor(length_batch), "text": text_batch}
        if text_end_batch is not None:
            x["feature_text_end"] = text_end_batch
        
        # Non-streaming generate (following generate_ldf.py 125-139)
        output = self.ldf_model.generate(x, num_denoise_steps=num_denoise_steps)
        generated_batch = output["generated"]
        
        # Decode with VAE and optionally convert to joints
        decoded_results = []
        joints_results = [] if output_joints else None
        
        # Import motion processing module once if needed
        if output_joints:
            import importlib.util
            import numpy as np
            utils_spec = importlib.util.spec_from_file_location(
                "motion_process",
                os.path.join(self.model_dir, "ldf_utils", "motion_process.py")
            )
            motion_process_module = importlib.util.module_from_spec(utils_spec)
            utils_spec.loader.exec_module(motion_process_module)
        
        for i, generated in enumerate(generated_batch):
            if generated is not None and torch.is_tensor(generated):
                # Decode with VAE (following generate_ldf.py line 130)
                decoded_g = self.vae.decode(generated[None, :])[0]
                
                if output_joints:
                    # Convert to joints using StreamJointRecovery263 with smoothing
                    # Create a new recovery instance for each sample to maintain independent state
                    decoded_np = decoded_g.cpu().numpy()
                    recovery = motion_process_module.StreamJointRecovery263(
                        joints_num=22, smoothing_alpha=smoothing_alpha
                    )
                    joints = [recovery.process_frame(frame) for frame in decoded_np]
                    joints = np.array(joints)
                    joints_results.append(joints)
                else:
                    decoded_results.append(decoded_g.cpu().numpy())
            else:
                if output_joints:
                    joints_results.append(None)
                else:
                    decoded_results.append(None)
        
        # Return results
        if output_joints:
            return joints_results[0] if is_single else joints_results
        else:
            return decoded_results[0] if is_single else decoded_results
    
    def generate(self, *args, **kwargs):
        """Alias for __call__ to match transformers API"""
        return self.__call__(*args, **kwargs)


# For backwards compatibility
LDFPipeline = LDFModel


# Register with AutoModel
try:
    from transformers import AutoModel, AutoConfig
    AutoConfig.register("ldf_motion", LDFConfig)
    AutoModel.register(LDFConfig, LDFModel)
except:
    pass