File size: 40,109 Bytes
9fd1204
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
import argparse
import json
import os
import time
import traceback
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import datasets.distributed
import torch
import wandb
from diffusers.hooks import HookRegistry, ModelHook
from diffusers.utils import export_to_video

from finetrainers import data, get_logger, logging, parallel, patches, utils
from finetrainers.args import AttentionProviderInference
from finetrainers.config import ModelType
from finetrainers.models import ModelSpecification, attention_provider
from finetrainers.models.cogvideox import CogVideoXModelSpecification
from finetrainers.models.cogview4 import CogView4ModelSpecification
from finetrainers.models.flux import FluxModelSpecification
from finetrainers.models.wan import WanModelSpecification
from finetrainers.parallel import ParallelBackendEnum
from finetrainers.state import ParallelBackendType
from finetrainers.utils import ArgsConfigMixin


logger = get_logger()


def main():
    try:
        import multiprocessing

        multiprocessing.set_start_method("fork")
    except Exception as e:
        logger.error(
            f'Failed to set multiprocessing start method to "fork". This can lead to poor performance, high memory usage, or crashes. '
            f"See: https://pytorch.org/docs/stable/notes/multiprocessing.html\n"
            f"Error: {e}"
        )

    try:
        args = BaseArgs()
        args.parse_args()

        model_specification_cls = get_model_specifiction_cls(args.model_name, args.inference_type)
        model_specification = model_specification_cls(
            pretrained_model_name_or_path=args.pretrained_model_name_or_path,
            tokenizer_id=args.tokenizer_id,
            tokenizer_2_id=args.tokenizer_2_id,
            tokenizer_3_id=args.tokenizer_3_id,
            text_encoder_id=args.text_encoder_id,
            text_encoder_2_id=args.text_encoder_2_id,
            text_encoder_3_id=args.text_encoder_3_id,
            transformer_id=args.transformer_id,
            vae_id=args.vae_id,
            text_encoder_dtype=args.text_encoder_dtype,
            text_encoder_2_dtype=args.text_encoder_2_dtype,
            text_encoder_3_dtype=args.text_encoder_3_dtype,
            transformer_dtype=args.transformer_dtype,
            vae_dtype=args.vae_dtype,
            revision=args.revision,
            cache_dir=args.cache_dir,
        )

        inferencer = Inference(args, model_specification)
        inferencer.run()

    except KeyboardInterrupt:
        logger.info("Received keyboard interrupt. Exiting...")
    except Exception as e:
        logger.error(f"An error occurred during training: {e}")
        logger.error(traceback.format_exc())


class InferenceType(str, Enum):
    TEXT_TO_IMAGE = "text_to_image"
    TEXT_TO_VIDEO = "text_to_video"
    IMAGE_TO_VIDEO = "image_to_video"


# We do a union because every ArgsConfigMixin registered to BaseArgs can be looked up using the `__getattribute__` override
BaseArgsType = Union[
    "BaseArgs", "ParallelArgs", "ModelArgs", "InferenceArgs", "AttentionProviderArgs", "TorchConfigArgs"
]

_DTYPE_MAP = {
    "bf16": torch.bfloat16,
    "fp16": torch.float16,
    "fp32": torch.float32,
    "float8_e4m3fn": torch.float8_e4m3fn,
    "float8_e5m2": torch.float8_e5m2,
}


SUPPORTED_MODEL_CONFIGS = {
    ModelType.COGVIDEOX: {
        InferenceType.TEXT_TO_VIDEO: CogVideoXModelSpecification,
    },
    ModelType.COGVIEW4: {
        InferenceType.TEXT_TO_IMAGE: CogView4ModelSpecification,
    },
    ModelType.FLUX: {
        InferenceType.TEXT_TO_IMAGE: FluxModelSpecification,
    },
    ModelType.WAN: {
        InferenceType.TEXT_TO_VIDEO: WanModelSpecification,
        InferenceType.IMAGE_TO_VIDEO: WanModelSpecification,
    },
}


def get_model_specifiction_cls(model_name: str, inference_type: InferenceType) -> ModelSpecification:
    """
    Get the model specification class for the given model name and inference type.
    """
    if model_name not in SUPPORTED_MODEL_CONFIGS:
        raise ValueError(
            f"Model {model_name} not supported. Supported models are: {list(SUPPORTED_MODEL_CONFIGS.keys())}"
        )
    if inference_type not in SUPPORTED_MODEL_CONFIGS[model_name]:
        raise ValueError(
            f"Inference type {inference_type} not supported for model {model_name}. Supported inference types are: {list(SUPPORTED_MODEL_CONFIGS[model_name].keys())}"
        )
    return SUPPORTED_MODEL_CONFIGS[model_name][inference_type]


@dataclass
class State:
    # Parallel state
    parallel_backend: ParallelBackendType = None

    # Training state
    generator: torch.Generator = None


class Inference:
    def __init__(self, args: BaseArgsType, model_specification: ModelSpecification):
        self.args = args
        self.model_specification = model_specification
        self.state = State()

        self.pipeline = None
        self.dataset = None
        self.dataloader = None

        self._init_distributed()
        self._init_config_options()

        patches.perform_patches_for_inference(args, self.state.parallel_backend)

    def run(self) -> None:
        try:
            self._prepare_pipeline()
            self._prepare_distributed()
            self._prepare_dataset()
            self._inference()
        except Exception as e:
            logger.error(f"Error during inference: {e}")
            self.state.parallel_backend.destroy()
            raise e

    def _prepare_pipeline(self) -> None:
        logger.info("Initializing pipeline")

        transformer = self.model_specification.load_diffusion_models()["transformer"]
        self.pipeline = self.model_specification.load_pipeline(
            transformer=transformer,
            enable_slicing=self.args.enable_slicing,
            enable_tiling=self.args.enable_tiling,
            enable_model_cpu_offload=False,  # TODO(aryan): handle model/sequential/group offloading
            training=False,
        )

    def _prepare_distributed(self) -> None:
        parallel_backend = self.state.parallel_backend
        cp_mesh = parallel_backend.get_mesh("cp") if parallel_backend.context_parallel_enabled else None

        if parallel_backend.context_parallel_enabled:
            cp_mesh = parallel_backend.get_mesh()["cp"]
            parallel_backend.apply_context_parallel(self.pipeline.transformer, cp_mesh)

        registry = HookRegistry.check_if_exists_or_initialize(self.pipeline.transformer)
        hook = AttentionProviderHook(
            self.args.attn_provider, cp_mesh, self.args.cp_rotate_method, self.args.cp_reduce_precision
        )
        registry.register_hook(hook, "attn_provider")

        self._maybe_torch_compile()

        self._init_logging()
        self._init_trackers()
        self._init_directories()

    def _prepare_dataset(self) -> None:
        logger.info("Preparing dataset for inference")
        parallel_backend = self.state.parallel_backend

        dp_mesh = None
        if parallel_backend.data_replication_enabled:
            dp_mesh = parallel_backend.get_mesh("dp_replicate")
        if dp_mesh is not None:
            local_rank, dp_world_size = dp_mesh.get_local_rank(), dp_mesh.size()
        else:
            local_rank, dp_world_size = 0, 1

        dataset = data.ValidationDataset(self.args.dataset_file)
        dataset._data = datasets.distributed.split_dataset_by_node(dataset._data, local_rank, dp_world_size)
        dataloader = data.DPDataLoader(
            local_rank,
            dataset,
            batch_size=1,
            num_workers=0,  # TODO(aryan): handle dataloader_num_workers
            collate_fn=lambda items: items,
        )

        self.dataset = dataset
        self.dataloader = dataloader

    def _inference(self) -> None:
        parallel_backend = self.state.parallel_backend
        seed = self.args.seed if self.args.seed is not None else 0
        generator = torch.Generator(device=parallel_backend.device).manual_seed(seed)

        if parallel_backend._dp_degree > 1:
            dp_mesh = parallel_backend.get_mesh("dp")
            dp_local_rank, dp_world_size = dp_mesh.get_local_rank(), dp_mesh.size()
        else:
            dp_mesh = None
            dp_local_rank, dp_world_size = parallel_backend.local_rank, 1

        self.pipeline.to(parallel_backend.device)

        memory_statistics = utils.get_memory_statistics()
        logger.info(f"Memory before inference start: {json.dumps(memory_statistics, indent=4)}")

        data_iterator = iter(self.dataloader)
        main_process_prompts_to_filenames = {}  # Used to save model card
        all_processes_artifacts = []  # Used to gather artifacts from all processes

        while True:
            inference_data = next(data_iterator, None)
            if inference_data is None:
                break

            inference_data = inference_data[0]
            with torch.inference_mode():
                inference_artifacts = self.model_specification.validation(
                    pipeline=self.pipeline, generator=generator, **inference_data
                )

            if dp_local_rank != 0:
                continue

            PROMPT = inference_data["prompt"]
            IMAGE = inference_data.get("image", None)
            VIDEO = inference_data.get("video", None)
            EXPORT_FPS = inference_data.get("export_fps", 30)

            # 2.1. If there are any initial images or videos, they will be logged to keep track of them as
            # conditioning for generation.
            prompt_filename = utils.string_to_filename(PROMPT)[:25]
            artifacts = {
                "input_image": data.ImageArtifact(value=IMAGE),
                "input_video": data.VideoArtifact(value=VIDEO),
            }

            # 2.2. Track the artifacts generated from inference
            for i, inference_artifact in enumerate(inference_artifacts):
                if inference_artifact.value is None:
                    continue
                artifacts.update({f"artifact_{i}": inference_artifact})

            # 2.3. Save the artifacts to the output directory and create appropriate logging objects
            # TODO(aryan): Currently, we only support WandB so we've hardcoded it here. Needs to be revisited.
            for index, (key, artifact) in enumerate(list(artifacts.items())):
                assert isinstance(artifact, (data.ImageArtifact, data.VideoArtifact))
                if artifact.value is None:
                    continue

                time_, rank, ext = int(time.time()), parallel_backend.rank, artifact.file_extension
                filename = f"inference-{rank}-{index}-{prompt_filename}-{time_}.{ext}"
                output_filename = os.path.join(self.args.output_dir, filename)

                if parallel_backend.is_main_process and ext in ["mp4", "jpg", "jpeg", "png"]:
                    main_process_prompts_to_filenames[PROMPT] = filename

                if isinstance(artifact, data.ImageArtifact):
                    artifact.value.save(output_filename)
                    all_processes_artifacts.append(wandb.Image(output_filename, caption=PROMPT))
                elif isinstance(artifact, data.VideoArtifact):
                    export_to_video(artifact.value, output_filename, fps=EXPORT_FPS)
                    all_processes_artifacts.append(wandb.Video(output_filename, caption=PROMPT))

        # 3. Cleanup & log artifacts
        parallel_backend.wait_for_everyone()

        memory_statistics = utils.get_memory_statistics()
        logger.info(f"Memory after inference end: {json.dumps(memory_statistics, indent=4)}")

        # Gather artifacts from all processes. We also need to flatten them since each process returns a list of artifacts.
        all_artifacts = [None] * dp_world_size
        if dp_world_size > 1:
            torch.distributed.all_gather_object(all_artifacts, all_processes_artifacts)
        else:
            all_artifacts = [all_processes_artifacts]
        all_artifacts = [artifact for artifacts in all_artifacts for artifact in artifacts]

        if parallel_backend.is_main_process:
            tracker_key = "inference"
            artifact_log_dict = {}

            image_artifacts = [artifact for artifact in all_artifacts if isinstance(artifact, wandb.Image)]
            if len(image_artifacts) > 0:
                artifact_log_dict["images"] = image_artifacts
            video_artifacts = [artifact for artifact in all_artifacts if isinstance(artifact, wandb.Video)]
            if len(video_artifacts) > 0:
                artifact_log_dict["videos"] = video_artifacts
            parallel_backend.log({tracker_key: artifact_log_dict}, step=0)

        parallel_backend.wait_for_everyone()

    def _init_distributed(self) -> None:
        world_size = int(os.environ.get("WORLD_SIZE", torch.cuda.device_count()))

        # TODO(aryan): handle other backends
        backend_cls: parallel.ParallelBackendType = parallel.get_parallel_backend_cls(self.args.parallel_backend)
        self.state.parallel_backend = backend_cls(
            world_size=world_size,
            pp_degree=self.args.pp_degree,
            dp_degree=self.args.dp_degree,
            dp_shards=self.args.dp_shards,
            cp_degree=self.args.cp_degree,
            tp_degree=self.args.tp_degree,
            backend="nccl",
            timeout=self.args.init_timeout,
            logging_dir=self.args.logging_dir,
            output_dir=self.args.output_dir,
        )

        if self.args.seed is not None:
            self.state.parallel_backend.enable_determinism(self.args.seed)

    def _init_logging(self) -> None:
        logging._set_parallel_backend(self.state.parallel_backend)
        logging.set_dependency_log_level(self.args.verbose, self.state.parallel_backend.is_local_main_process)
        logger.info("Initialized Finetrainers")

    def _init_trackers(self) -> None:
        # TODO(aryan): handle multiple trackers
        trackers = [self.args.report_to]
        experiment_name = self.args.tracker_name or "finetrainers-inference"
        self.state.parallel_backend.initialize_trackers(
            trackers, experiment_name=experiment_name, config=self.args.to_dict(), log_dir=self.args.logging_dir
        )

    def _init_directories(self) -> None:
        if self.state.parallel_backend.is_main_process:
            self.args.output_dir = Path(self.args.output_dir)
            self.args.output_dir.mkdir(parents=True, exist_ok=True)

    def _init_config_options(self) -> None:
        # Enable TF32 for faster training on Ampere GPUs: https://pytorch.org/docs/stable/notes/cuda.html#tensorfloat-32-tf32-on-ampere-devices
        if self.args.allow_tf32 and torch.cuda.is_available():
            torch.backends.cuda.matmul.allow_tf32 = True
        torch.set_float32_matmul_precision(self.args.float32_matmul_precision)

    def _maybe_torch_compile(self):
        for model_name, compile_scope in zip(self.args.compile_modules, self.args.compile_scopes):
            model = getattr(self.pipeline, model_name, None)
            if model is not None:
                logger.info(f"Applying torch.compile to '{model_name}' with scope '{compile_scope}'.")
                compiled_model = utils.apply_compile(model, compile_scope)
                setattr(self.pipeline, model_name, compiled_model)


class AttentionProviderHook(ModelHook):
    def __init__(
        self,
        provider: str,
        mesh: Optional[torch.distributed.device_mesh.DeviceMesh] = None,
        rotate_method: str = "allgather",
        reduce_precision: bool = False,
    ):
        super().__init__()
        self.provider = provider
        self.mesh = mesh
        self.rotate_method = rotate_method
        self.convert_to_fp32 = not reduce_precision

    def new_forward(self, module: torch.nn.Module, *args, **kwargs) -> Any:
        with attention_provider(
            self.provider, mesh=self.mesh, convert_to_fp32=self.convert_to_fp32, rotate_method=self.rotate_method
        ):
            return self.fn_ref.original_forward(*args, **kwargs)


class ParallelArgs(ArgsConfigMixin):
    """
    Args:
        parallel_backend (`str`, defaults to "accelerate"):
            The parallel backend to use for inference. Choose between ['accelerate', 'ptd'].
        pp_degree (`int`, defaults to 1):
            The degree of pipeline parallelism.
        dp_degree (`int`, defaults to 1):
            The degree of data parallelism (number of model replicas).
        dp_shards (`int`, defaults to 1):
            The number of data parallel shards (number of model partitions).
        cp_degree (`int`, defaults to 1):
            The degree of context parallelism.
    """

    parallel_backend: ParallelBackendEnum = ParallelBackendEnum.ACCELERATE
    pp_degree: int = 1
    dp_degree: int = 1
    dp_shards: int = 1
    cp_degree: int = 1
    tp_degree: int = 1
    cp_rotate_method: str = "allgather"
    cp_reduce_precision: bool = False

    def add_args(self, parser: argparse.ArgumentParser) -> None:
        parser.add_argument("--parallel_backend", type=str, default="accelerate", choices=["accelerate", "ptd"])
        parser.add_argument("--pp_degree", type=int, default=1)
        parser.add_argument("--dp_degree", type=int, default=1)
        parser.add_argument("--dp_shards", type=int, default=1)
        parser.add_argument("--cp_degree", type=int, default=1)
        parser.add_argument("--tp_degree", type=int, default=1)
        parser.add_argument("--cp_rotate_method", type=str, default="allgather", choices=["allgather", "alltoall"])
        parser.add_argument("--cp_reduce_precision", action="store_true")

    def map_args(self, argparse_args: argparse.Namespace, mapped_args: "BaseArgs"):
        mapped_args.parallel_backend = argparse_args.parallel_backend
        mapped_args.pp_degree = argparse_args.pp_degree
        mapped_args.dp_degree = argparse_args.dp_degree
        mapped_args.dp_shards = argparse_args.dp_shards
        mapped_args.cp_degree = argparse_args.cp_degree
        mapped_args.tp_degree = argparse_args.tp_degree
        mapped_args.cp_rotate_method = argparse_args.cp_rotate_method
        mapped_args.cp_reduce_precision = argparse_args.cp_reduce_precision

    def validate_args(self, args: "BaseArgs"):
        if args.parallel_backend != "ptd":
            raise ValueError("Only 'ptd' parallel backend is supported for now.")
        if any(x > 1 for x in [args.pp_degree, args.dp_degree, args.dp_shards, args.tp_degree]):
            raise ValueError("Parallel degrees must be 1 except for `cp_degree` for now.")


class ModelArgs(ArgsConfigMixin):
    """
    Args:
        model_name (`str`):
            Name of model to train.
        pretrained_model_name_or_path (`str`):
            Path to pretrained model or model identifier from https://huggingface.co/models. The model should be
            loadable based on specified `model_name`.
        revision (`str`, defaults to `None`):
            If provided, the model will be loaded from a specific branch of the model repository.
        variant (`str`, defaults to `None`):
            Variant of model weights to use. Some models provide weight variants, such as `fp16`, to reduce disk
            storage requirements.
        cache_dir (`str`, defaults to `None`):
            The directory where the downloaded models and datasets will be stored, or loaded from.
        tokenizer_id (`str`, defaults to `None`):
            Identifier for the tokenizer model. This is useful when using a different tokenizer than the default from `pretrained_model_name_or_path`.
        tokenizer_2_id (`str`, defaults to `None`):
            Identifier for the second tokenizer model. This is useful when using a different tokenizer than the default from `pretrained_model_name_or_path`.
        tokenizer_3_id (`str`, defaults to `None`):
            Identifier for the third tokenizer model. This is useful when using a different tokenizer than the default from `pretrained_model_name_or_path`.
        text_encoder_id (`str`, defaults to `None`):
            Identifier for the text encoder model. This is useful when using a different text encoder than the default from `pretrained_model_name_or_path`.
        text_encoder_2_id (`str`, defaults to `None`):
            Identifier for the second text encoder model. This is useful when using a different text encoder than the default from `pretrained_model_name_or_path`.
        text_encoder_3_id (`str`, defaults to `None`):
            Identifier for the third text encoder model. This is useful when using a different text encoder than the default from `pretrained_model_name_or_path`.
        transformer_id (`str`, defaults to `None`):
            Identifier for the transformer model. This is useful when using a different transformer model than the default from `pretrained_model_name_or_path`.
        vae_id (`str`, defaults to `None`):
            Identifier for the VAE model. This is useful when using a different VAE model than the default from `pretrained_model_name_or_path`.
        text_encoder_dtype (`torch.dtype`, defaults to `torch.bfloat16`):
            Data type for the text encoder when generating text embeddings.
        text_encoder_2_dtype (`torch.dtype`, defaults to `torch.bfloat16`):
            Data type for the text encoder 2 when generating text embeddings.
        text_encoder_3_dtype (`torch.dtype`, defaults to `torch.bfloat16`):
            Data type for the text encoder 3 when generating text embeddings.
        transformer_dtype (`torch.dtype`, defaults to `torch.bfloat16`):
            Data type for the transformer model.
        vae_dtype (`torch.dtype`, defaults to `torch.bfloat16`):
            Data type for the VAE model.
        layerwise_upcasting_modules (`List[str]`, defaults to `[]`):
            Modules that should have fp8 storage weights but higher precision computation. Choose between ['transformer'].
        layerwise_upcasting_storage_dtype (`torch.dtype`, defaults to `float8_e4m3fn`):
            Data type for the layerwise upcasting storage. Choose between ['float8_e4m3fn', 'float8_e5m2'].
        layerwise_upcasting_skip_modules_pattern (`List[str]`, defaults to `["patch_embed", "pos_embed", "x_embedder", "context_embedder", "^proj_in$", "^proj_out$", "norm"]`):
            Modules to skip for layerwise upcasting. Layers such as normalization and modulation, when casted to fp8 precision
            naively (as done in layerwise upcasting), can lead to poorer training and inference quality. We skip these layers
            by default, and recommend adding more layers to the default list based on the model architecture.
        enable_slicing (`bool`, defaults to `False`):
            Whether to enable VAE slicing.
        enable_tiling (`bool`, defaults to `False`):
            Whether to enable VAE tiling.
    """

    model_name: str = None
    pretrained_model_name_or_path: str = None
    revision: Optional[str] = None
    variant: Optional[str] = None
    cache_dir: Optional[str] = None
    tokenizer_id: Optional[str] = None
    tokenizer_2_id: Optional[str] = None
    tokenizer_3_id: Optional[str] = None
    text_encoder_id: Optional[str] = None
    text_encoder_2_id: Optional[str] = None
    text_encoder_3_id: Optional[str] = None
    transformer_id: Optional[str] = None
    vae_id: Optional[str] = None
    text_encoder_dtype: torch.dtype = torch.bfloat16
    text_encoder_2_dtype: torch.dtype = torch.bfloat16
    text_encoder_3_dtype: torch.dtype = torch.bfloat16
    transformer_dtype: torch.dtype = torch.bfloat16
    vae_dtype: torch.dtype = torch.bfloat16
    layerwise_upcasting_modules: List[str] = []
    layerwise_upcasting_storage_dtype: torch.dtype = torch.float8_e4m3fn
    # fmt: off
    layerwise_upcasting_skip_modules_pattern: List[str] = ["patch_embed", "pos_embed", "x_embedder", "context_embedder", "time_embed", "^proj_in$", "^proj_out$", "norm"]
    # fmt: on
    enable_slicing: bool = False
    enable_tiling: bool = False

    def add_args(self, parser: argparse.ArgumentParser) -> None:
        parser.add_argument(
            "--model_name", type=str, required=True, choices=[x.value for x in ModelType.__members__.values()]
        )
        parser.add_argument("--pretrained_model_name_or_path", type=str, required=True)
        parser.add_argument("--revision", type=str, default=None, required=False)
        parser.add_argument("--variant", type=str, default=None)
        parser.add_argument("--cache_dir", type=str, default=None)
        parser.add_argument("--tokenizer_id", type=str, default=None)
        parser.add_argument("--tokenizer_2_id", type=str, default=None)
        parser.add_argument("--tokenizer_3_id", type=str, default=None)
        parser.add_argument("--text_encoder_id", type=str, default=None)
        parser.add_argument("--text_encoder_2_id", type=str, default=None)
        parser.add_argument("--text_encoder_3_id", type=str, default=None)
        parser.add_argument("--transformer_id", type=str, default=None)
        parser.add_argument("--vae_id", type=str, default=None)
        parser.add_argument("--text_encoder_dtype", type=str, default="bf16")
        parser.add_argument("--text_encoder_2_dtype", type=str, default="bf16")
        parser.add_argument("--text_encoder_3_dtype", type=str, default="bf16")
        parser.add_argument("--transformer_dtype", type=str, default="bf16")
        parser.add_argument("--vae_dtype", type=str, default="bf16")
        parser.add_argument("--layerwise_upcasting_modules", type=str, default=[], nargs="+", choices=["transformer"])
        parser.add_argument(
            "--layerwise_upcasting_storage_dtype",
            type=str,
            default="float8_e4m3fn",
            choices=["float8_e4m3fn", "float8_e5m2"],
        )
        parser.add_argument(
            "--layerwise_upcasting_skip_modules_pattern",
            type=str,
            default=["patch_embed", "pos_embed", "x_embedder", "context_embedder", "^proj_in$", "^proj_out$", "norm"],
            nargs="+",
        )
        parser.add_argument("--enable_slicing", action="store_true")
        parser.add_argument("--enable_tiling", action="store_true")

    def map_args(self, argparse_args: argparse.Namespace, mapped_args: "BaseArgs"):
        mapped_args.model_name = argparse_args.model_name
        mapped_args.pretrained_model_name_or_path = argparse_args.pretrained_model_name_or_path
        mapped_args.revision = argparse_args.revision
        mapped_args.variant = argparse_args.variant
        mapped_args.cache_dir = argparse_args.cache_dir
        mapped_args.tokenizer_id = argparse_args.tokenizer_id
        mapped_args.tokenizer_2_id = argparse_args.tokenizer_2_id
        mapped_args.tokenizer_3_id = argparse_args.tokenizer_3_id
        mapped_args.text_encoder_id = argparse_args.text_encoder_id
        mapped_args.text_encoder_2_id = argparse_args.text_encoder_2_id
        mapped_args.text_encoder_3_id = argparse_args.text_encoder_3_id
        mapped_args.transformer_id = argparse_args.transformer_id
        mapped_args.vae_id = argparse_args.vae_id
        mapped_args.text_encoder_dtype = _DTYPE_MAP[argparse_args.text_encoder_dtype]
        mapped_args.text_encoder_2_dtype = _DTYPE_MAP[argparse_args.text_encoder_2_dtype]
        mapped_args.text_encoder_3_dtype = _DTYPE_MAP[argparse_args.text_encoder_3_dtype]
        mapped_args.transformer_dtype = _DTYPE_MAP[argparse_args.transformer_dtype]
        mapped_args.vae_dtype = _DTYPE_MAP[argparse_args.vae_dtype]
        mapped_args.layerwise_upcasting_modules = argparse_args.layerwise_upcasting_modules
        mapped_args.layerwise_upcasting_storage_dtype = _DTYPE_MAP[argparse_args.layerwise_upcasting_storage_dtype]
        mapped_args.layerwise_upcasting_skip_modules_pattern = argparse_args.layerwise_upcasting_skip_modules_pattern
        mapped_args.enable_slicing = argparse_args.enable_slicing
        mapped_args.enable_tiling = argparse_args.enable_tiling

    def validate_args(self, args: "BaseArgs"):
        pass


class InferenceArgs(ArgsConfigMixin):
    """
    Args:
        inference_type (`str`):
            The type of inference to run. Choose between ['text_to_video'].
        dataset_file (`str`, defaults to `None`):
            Path to a CSV/JSON/PARQUET/ARROW file containing information for inference. The file must contain atleast the
            "caption" column. Other columns such as "image_path" and "video_path" can be provided too. If provided, "image_path"
            will be used to load a PIL.Image.Image and set the "image" key in the sample dictionary. Similarly, "video_path"
            will be used to load a List[PIL.Image.Image] and set the "video" key in the sample dictionary.
            The dataset file may contain other attributes such as:
                - "height" and "width" and "num_frames": Resolution
                - "num_inference_steps": Number of inference steps
                - "guidance_scale": Classifier-free Guidance Scale
                - ... (any number of additional attributes can be provided. The ModelSpecification::validate method will be
                invoked with the sample dictionary to validate the sample.)
    """

    inference_type: InferenceType = InferenceType.TEXT_TO_VIDEO
    dataset_file: str = None

    def add_args(self, parser: argparse.ArgumentParser) -> None:
        parser.add_argument(
            "--inference_type",
            type=str,
            default=InferenceType.TEXT_TO_VIDEO.value,
            choices=[x.value for x in InferenceType.__members__.values()],
        )
        parser.add_argument("--dataset_file", type=str, required=True)

    def map_args(self, argparse_args: argparse.Namespace, mapped_args: "BaseArgs"):
        mapped_args.inference_type = InferenceType(argparse_args.inference_type)
        mapped_args.dataset_file = argparse_args.dataset_file

    def validate_args(self, args: "BaseArgs"):
        pass


class AttentionProviderArgs(ArgsConfigMixin):
    """
    Args:
        attn_provider (`str`, defaults to "native"):
            The attention provider to use for inference. Choose between ['flash', 'flash_varlen', 'flex', 'native', '_native_cudnn', '_native_efficient', '_native_flash', '_native_math', 'sage', 'sage_varlen', '_sage_qk_int8_pv_fp8_cuda', '_sage_qk_int8_pv_fp8_cuda_sm90', '_sage_qk_int8_pv_fp16_cuda', '_sage_qk_int8_pv_fp16_triton', 'xformers'].
    """

    attn_provider: AttentionProviderInference = "native"
    # attn_provider_specialized_modules: List[str] = []

    def add_args(self, parser: argparse.ArgumentParser) -> None:
        # fmt: off
        parser.add_argument("--attn_provider", type=str, default="native", choices=["flash", "flash_varlen", "flex", "native", "_native_cudnn", "_native_efficient", "_native_flash", "_native_math", "sage", "sage_varlen", "_sage_qk_int8_pv_fp8_cuda", "_sage_qk_int8_pv_fp8_cuda_sm90", "_sage_qk_int8_pv_fp16_cuda", "_sage_qk_int8_pv_fp16_triton", "xformers"])
        # fmt: on

    def map_args(self, argparse_args: argparse.Namespace, mapped_args: "BaseArgs"):
        mapped_args.attn_provider = argparse_args.attn_provider

    def validate_args(self, args: "BaseArgs"):
        pass


class TorchConfigArgs(ArgsConfigMixin):
    """
    Args:
        compile_modules (`List[str]`, defaults to `[]`):
            Modules that should be regionally compiled with `torch.compile`.
        compile_scopes (`str`, defaults to `None`):
            The scope of compilation for each `--compile_modules`. Choose between ['regional', 'full']. Must have the same length as
            `--compile_modules`. If `None`, will default to `regional` for all modules.
        allow_tf32 (`bool`, defaults to `False`):
            Whether or not to allow the use of TF32 matmul on compatible hardware.
        float32_matmul_precision (`str`, defaults to `highest`):
            The precision to use for float32 matmul. Choose between ['highest', 'high', 'medium'].
    """

    compile_modules: List[str] = []
    compile_scopes: List[str] = None
    allow_tf32: bool = False
    float32_matmul_precision: str = "highest"

    def add_args(self, parser: argparse.ArgumentParser) -> None:
        parser.add_argument("--compile_modules", type=str, default=[], nargs="+")
        parser.add_argument("--compile_scopes", type=str, default=None, nargs="+")
        parser.add_argument("--allow_tf32", action="store_true")
        parser.add_argument(
            "--float32_matmul_precision",
            type=str,
            default="highest",
            choices=["highest", "high", "medium"],
            help="The precision to use for float32 matmul. Choose between ['highest', 'high', 'medium'].",
        )

    def map_args(self, argparse_args: argparse.Namespace, mapped_args: "BaseArgs"):
        compile_scopes = argparse_args.compile_scopes
        if len(argparse_args.compile_modules) > 0:
            if compile_scopes is None:
                compile_scopes = "regional"
            if isinstance(compile_scopes, list) and len(compile_scopes) == 1:
                compile_scopes = compile_scopes[0]
            if isinstance(compile_scopes, str):
                compile_scopes = [compile_scopes] * len(argparse_args.compile_modules)
        else:
            compile_scopes = []

        mapped_args.compile_modules = argparse_args.compile_modules
        mapped_args.compile_scopes = compile_scopes
        mapped_args.allow_tf32 = argparse_args.allow_tf32
        mapped_args.float32_matmul_precision = argparse_args.float32_matmul_precision

    def validate_args(self, args: "BaseArgs"):
        if len(args.compile_modules) > 0:
            assert len(args.compile_modules) == len(args.compile_scopes) and all(
                x in ["regional", "full"] for x in args.compile_scopes
            ), (
                "Compile modules and compile scopes must be of the same length and compile scopes must be either 'regional' or 'full'"
            )


class MiscellaneousArgs(ArgsConfigMixin):
    """
    Args:
        seed (`int`, defaults to `None`):
            Random seed for reproducibility under same initialization conditions.
        tracker_name (`str`, defaults to `finetrainers`):
            Name of the tracker/project to use for logging inference metrics.
        output_dir (`str`, defaults to `None`):
            The directory where the model checkpoints and logs will be stored.
        logging_dir (`str`, defaults to `logs`):
            The directory where the logs will be stored.
        logging_steps (`int`, defaults to `1`):
            Inference logs will be tracked every `logging_steps` steps.
        nccl_timeout (`int`, defaults to `1800`):
            Timeout for the NCCL communication.
        report_to (`str`, defaults to `wandb`):
            The name of the logger to use for logging inference metrics. Choose between ['wandb'].
        verbose (`int`, defaults to `1`):
            Whether or not to print verbose logs.
                - 0: Diffusers/Transformers warning logging on local main process only
                - 1: Diffusers/Transformers info logging on local main process only
                - 2: Diffusers/Transformers debug logging on local main process only
                - 3: Diffusers/Transformers debug logging on all processes
    """

    seed: Optional[int] = None
    tracker_name: str = "finetrainers-inference"
    output_dir: str = None
    logging_dir: Optional[str] = "logs"
    init_timeout: int = 300  # 5 minutes
    nccl_timeout: int = 600  # 10 minutes
    report_to: str = "wandb"
    verbose: int = 1

    def add_args(self, parser: argparse.ArgumentParser) -> None:
        parser.add_argument("--seed", type=int, default=None)
        parser.add_argument("--tracker_name", type=str, default="finetrainers")
        parser.add_argument("--output_dir", type=str, default="finetrainers-inference")
        parser.add_argument("--logging_dir", type=str, default="logs")
        parser.add_argument("--init_timeout", type=int, default=300)
        parser.add_argument("--nccl_timeout", type=int, default=600)
        parser.add_argument("--report_to", type=str, default="none", choices=["none", "wandb"])
        parser.add_argument("--verbose", type=int, default=0, choices=[0, 1, 2, 3])

    def map_args(self, argparse_args: argparse.Namespace, mapped_args: "BaseArgs"):
        mapped_args.seed = argparse_args.seed
        mapped_args.tracker_name = argparse_args.tracker_name
        mapped_args.output_dir = argparse_args.output_dir
        mapped_args.logging_dir = argparse_args.logging_dir
        mapped_args.init_timeout = argparse_args.init_timeout
        mapped_args.nccl_timeout = argparse_args.nccl_timeout
        mapped_args.report_to = argparse_args.report_to
        mapped_args.verbose = argparse_args.verbose

    def validate_args(self, args: "BaseArgs"):
        pass


class BaseArgs:
    """The arguments for the finetrainers inference script."""

    parallel_args = ParallelArgs()
    model_args = ModelArgs()
    inference_args = InferenceArgs()
    attention_provider_args = AttentionProviderArgs()
    torch_config_args = TorchConfigArgs()
    miscellaneous_args = MiscellaneousArgs()

    _registered_config_mixins: List[ArgsConfigMixin] = []
    _arg_group_map: Dict[str, ArgsConfigMixin] = {}

    def __init__(self):
        self._arg_group_map: Dict[str, ArgsConfigMixin] = {
            "parallel_args": self.parallel_args,
            "model_args": self.model_args,
            "inference_args": self.inference_args,
            "attention_provider_args": self.attention_provider_args,
            "torch_config_args": self.torch_config_args,
            "miscellaneous_args": self.miscellaneous_args,
        }

        for arg_config_mixin in self._arg_group_map.values():
            self.register_args(arg_config_mixin)

    def to_dict(self) -> Dict[str, Any]:
        arguments_to_dict = {}
        for config_mixin in self._registered_config_mixins:
            arguments_to_dict[config_mixin.__class__.__name__] = config_mixin.to_dict()

        return arguments_to_dict

    def register_args(self, config: ArgsConfigMixin) -> None:
        if not hasattr(self, "_extended_add_arguments"):
            self._extended_add_arguments = []
        self._extended_add_arguments.append((config.add_args, config.validate_args, config.map_args))
        self._registered_config_mixins.append(config)

    def parse_args(self):
        parser = argparse.ArgumentParser()

        for extended_add_arg_fns in getattr(self, "_extended_add_arguments", []):
            add_fn, _, _ = extended_add_arg_fns
            add_fn(parser)

        args, remaining_args = parser.parse_known_args()
        logger.debug(f"Remaining unparsed arguments: {remaining_args}")

        for extended_add_arg_fns in getattr(self, "_extended_add_arguments", []):
            _, _, map_fn = extended_add_arg_fns
            map_fn(args, self)

        for extended_add_arg_fns in getattr(self, "_extended_add_arguments", []):
            _, validate_fn, _ = extended_add_arg_fns
            validate_fn(self)

        return self

    def __getattribute__(self, name: str):
        try:
            return object.__getattribute__(self, name)
        except AttributeError:
            for arg_group in self._arg_group_map.values():
                if hasattr(arg_group, name):
                    return getattr(arg_group, name)
            raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")

    def __setattr__(self, name: str, value: Any):
        if name in self.__dict__:
            object.__setattr__(self, name, value)
            return
        for arg_group in self._arg_group_map.values():
            if hasattr(arg_group, name):
                setattr(arg_group, name, value)
                return
        object.__setattr__(self, name, value)


if __name__ == "__main__":
    main()