George-API commited on
Commit
3e9eac8
·
verified ·
1 Parent(s): 7ca0d01

Upload run_cloud_training.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. run_cloud_training.py +97 -25
run_cloud_training.py CHANGED
@@ -27,7 +27,47 @@ from transformers import AutoTokenizer, TrainingArguments, Trainer, AutoModelFor
27
  from transformers.data.data_collator import DataCollatorMixin
28
  from peft import LoraConfig
29
  from unsloth import FastLanguageModel
30
- import deepspeed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
 
32
  # Disable all attention optimizations that might cause issues
33
  os.environ["TRANSFORMERS_NO_FLASH_ATTENTION"] = "1"
@@ -581,31 +621,51 @@ def train(config_path, dataset_name, output_dir):
581
  if deepspeed_config:
582
  logger.info("DeepSpeed configuration found - enabling DeepSpeed for distributed training")
583
 
584
- # Create a temporary DeepSpeed config file
585
- ds_config_path = os.path.join(output_dir, "ds_config_temp.json")
586
-
587
- # Update DeepSpeed config with dynamic values
588
- if isinstance(deepspeed_config.get("train_micro_batch_size_per_gpu"), str) and deepspeed_config.get("train_micro_batch_size_per_gpu") == "auto":
589
- deepspeed_config["train_micro_batch_size_per_gpu"] = per_device_train_batch_size
 
 
590
 
591
- if isinstance(deepspeed_config.get("train_batch_size"), str) and deepspeed_config.get("train_batch_size") == "auto":
592
- deepspeed_config["train_batch_size"] = per_device_train_batch_size * gpu_count
593
-
594
- # Write the DeepSpeed config to a file
595
- with open(ds_config_path, 'w') as f:
596
- json.dump(deepspeed_config, f, indent=2)
597
-
598
- logger.info(f"Created DeepSpeed config at {ds_config_path}")
599
- logger.info(f"DeepSpeed ZeRO Stage: {deepspeed_config.get('zero_optimization', {}).get('stage', 'Not specified')}")
600
-
601
- # Enable CPU offloading if configured
602
- if deepspeed_config.get("zero_optimization", {}).get("offload_optimizer", {}).get("device") == "cpu":
603
- logger.info("DeepSpeed CPU offloading enabled for optimizer states")
604
-
605
- # Set using_deepspeed flag
606
- using_deepspeed = True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
607
  else:
608
  logger.warning("No DeepSpeed configuration found - continuing without DeepSpeed")
 
609
  using_deepspeed = False
610
 
611
  # Initialize model with our safe loading function
@@ -649,6 +709,7 @@ def train(config_path, dataset_name, output_dir):
649
  reports = ["none"]
650
  logger.warning("No reporting backends available - training metrics won't be logged")
651
 
 
652
  training_args_dict = {
653
  "output_dir": output_dir,
654
  "num_train_epochs": training_config.get("num_train_epochs", 3),
@@ -671,9 +732,20 @@ def train(config_path, dataset_name, output_dir):
671
  "remove_unused_columns": False,
672
  "seed": 42,
673
  "dataloader_num_workers": 4, # Use multiple workers for data loading
674
- "deepspeed": ds_config_path # Add DeepSpeed config path if available
675
  }
676
-
 
 
 
 
 
 
 
 
 
 
 
 
677
  # Create TrainingArguments with validated parameters
678
  training_args = TrainingArguments(**training_args_dict)
679
 
 
27
  from transformers.data.data_collator import DataCollatorMixin
28
  from peft import LoraConfig
29
  from unsloth import FastLanguageModel
30
+
31
+ # Try to import deepspeed and install mpi4py if needed
32
+ try:
33
+ import deepspeed
34
+ except ImportError:
35
+ logger.error("DeepSpeed not found. Installing deepspeed...")
36
+ try:
37
+ import subprocess
38
+ subprocess.check_call([sys.executable, "-m", "pip", "install", "deepspeed"])
39
+ import deepspeed
40
+ logger.info("DeepSpeed installed successfully")
41
+ except Exception as e:
42
+ logger.error(f"Failed to install DeepSpeed: {e}")
43
+ logger.error("Will continue without DeepSpeed")
44
+
45
+ # Check for mpi4py which is required by DeepSpeed
46
+ try:
47
+ import mpi4py
48
+ logger.info(f"mpi4py is available (version: {mpi4py.__version__})")
49
+ except ImportError:
50
+ logger.warning("mpi4py not found. Installing mpi4py which is required for DeepSpeed...")
51
+ try:
52
+ import subprocess
53
+ # First try to install OpenMPI if on a Linux system
54
+ if sys.platform.startswith('linux'):
55
+ try:
56
+ logger.info("Attempting to install OpenMPI system dependencies...")
57
+ subprocess.check_call(["apt-get", "update", "-y"])
58
+ subprocess.check_call(["apt-get", "install", "-y", "libopenmpi-dev", "openmpi-bin"])
59
+ logger.info("OpenMPI installed successfully")
60
+ except Exception as e:
61
+ logger.warning(f"Failed to install OpenMPI system dependencies: {e}")
62
+ logger.warning("Will try to install mpi4py anyway")
63
+
64
+ # Now install mpi4py
65
+ subprocess.check_call([sys.executable, "-m", "pip", "install", "mpi4py>=3.1.4"])
66
+ import mpi4py
67
+ logger.info(f"mpi4py installed successfully (version: {mpi4py.__version__})")
68
+ except Exception as e:
69
+ logger.error(f"Failed to install mpi4py: {e}")
70
+ logger.error("DeepSpeed may not work correctly without mpi4py")
71
 
72
  # Disable all attention optimizations that might cause issues
73
  os.environ["TRANSFORMERS_NO_FLASH_ATTENTION"] = "1"
 
621
  if deepspeed_config:
622
  logger.info("DeepSpeed configuration found - enabling DeepSpeed for distributed training")
623
 
624
+ # Check if mpi4py is available
625
+ mpi4py_available = False
626
+ try:
627
+ import mpi4py
628
+ mpi4py_available = True
629
+ except ImportError:
630
+ logger.error("mpi4py is required for DeepSpeed but not available")
631
+ logger.error("Will continue without DeepSpeed")
632
 
633
+ if mpi4py_available:
634
+ try:
635
+ # Create a temporary DeepSpeed config file
636
+ ds_config_path = os.path.join(output_dir, "ds_config_temp.json")
637
+
638
+ # Update DeepSpeed config with dynamic values
639
+ if isinstance(deepspeed_config.get("train_micro_batch_size_per_gpu"), str) and deepspeed_config.get("train_micro_batch_size_per_gpu") == "auto":
640
+ deepspeed_config["train_micro_batch_size_per_gpu"] = per_device_train_batch_size
641
+
642
+ if isinstance(deepspeed_config.get("train_batch_size"), str) and deepspeed_config.get("train_batch_size") == "auto":
643
+ deepspeed_config["train_batch_size"] = per_device_train_batch_size * gpu_count
644
+
645
+ # Write the DeepSpeed config to a file
646
+ with open(ds_config_path, 'w') as f:
647
+ json.dump(deepspeed_config, f, indent=2)
648
+
649
+ logger.info(f"Created DeepSpeed config at {ds_config_path}")
650
+ logger.info(f"DeepSpeed ZeRO Stage: {deepspeed_config.get('zero_optimization', {}).get('stage', 'Not specified')}")
651
+
652
+ # Enable CPU offloading if configured
653
+ if deepspeed_config.get("zero_optimization", {}).get("offload_optimizer", {}).get("device") == "cpu":
654
+ logger.info("DeepSpeed CPU offloading enabled for optimizer states")
655
+
656
+ # Set using_deepspeed flag
657
+ using_deepspeed = True
658
+ except Exception as e:
659
+ logger.error(f"Failed to initialize DeepSpeed: {e}")
660
+ logger.error("Will continue without DeepSpeed")
661
+ ds_config_path = None
662
+ using_deepspeed = False
663
+ else:
664
+ ds_config_path = None
665
+ using_deepspeed = False
666
  else:
667
  logger.warning("No DeepSpeed configuration found - continuing without DeepSpeed")
668
+ ds_config_path = None
669
  using_deepspeed = False
670
 
671
  # Initialize model with our safe loading function
 
709
  reports = ["none"]
710
  logger.warning("No reporting backends available - training metrics won't be logged")
711
 
712
+ # Prepare training arguments
713
  training_args_dict = {
714
  "output_dir": output_dir,
715
  "num_train_epochs": training_config.get("num_train_epochs", 3),
 
732
  "remove_unused_columns": False,
733
  "seed": 42,
734
  "dataloader_num_workers": 4, # Use multiple workers for data loading
 
735
  }
736
+
737
+ # Add DeepSpeed config if available and mpi4py is installed
738
+ if using_deepspeed and ds_config_path is not None:
739
+ logger.info("Using DeepSpeed for training")
740
+ training_args_dict["deepspeed"] = ds_config_path
741
+ else:
742
+ logger.info("Not using DeepSpeed - falling back to standard distributed training")
743
+ # If DeepSpeed is not available, ensure we're still using distributed training efficiently
744
+ if gpu_count > 1:
745
+ logger.info(f"Using standard distributed training with {gpu_count} GPUs")
746
+ training_args_dict["local_rank"] = int(os.environ.get("LOCAL_RANK", -1))
747
+ training_args_dict["gradient_checkpointing"] = True
748
+
749
  # Create TrainingArguments with validated parameters
750
  training_args = TrainingArguments(**training_args_dict)
751