George-API commited on
Commit
494b544
·
verified ·
1 Parent(s): 15ea6e6

Upload run_cloud_training.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. run_cloud_training.py +68 -91
run_cloud_training.py CHANGED
@@ -28,46 +28,32 @@ from transformers.data.data_collator import DataCollatorMixin
28
  from peft import LoraConfig
29
  from unsloth import FastLanguageModel
30
 
31
- # Try to import deepspeed and install mpi4py if needed
32
- try:
33
- import deepspeed
34
- except ImportError:
35
- logger.error("DeepSpeed not found. Installing deepspeed...")
36
- try:
37
- import subprocess
38
- subprocess.check_call([sys.executable, "-m", "pip", "install", "deepspeed"])
39
- import deepspeed
40
- logger.info("DeepSpeed installed successfully")
41
- except Exception as e:
42
- logger.error(f"Failed to install DeepSpeed: {e}")
43
- logger.error("Will continue without DeepSpeed")
44
 
45
- # Check for mpi4py which is required by DeepSpeed
46
  try:
47
- import mpi4py
48
- logger.info(f"mpi4py is available (version: {mpi4py.__version__})")
49
- except ImportError:
50
- logger.warning("mpi4py not found. Installing mpi4py which is required for DeepSpeed...")
51
- try:
52
  import subprocess
53
- # First try to install OpenMPI if on a Linux system
54
- if sys.platform.startswith('linux'):
55
- try:
56
- logger.info("Attempting to install OpenMPI system dependencies...")
57
- subprocess.check_call(["apt-get", "update", "-y"])
58
- subprocess.check_call(["apt-get", "install", "-y", "libopenmpi-dev", "openmpi-bin"])
59
- logger.info("OpenMPI installed successfully")
60
- except Exception as e:
61
- logger.warning(f"Failed to install OpenMPI system dependencies: {e}")
62
- logger.warning("Will try to install mpi4py anyway")
63
-
64
- # Now install mpi4py
65
- subprocess.check_call([sys.executable, "-m", "pip", "install", "mpi4py>=3.1.4"])
66
- import mpi4py
67
- logger.info(f"mpi4py installed successfully (version: {mpi4py.__version__})")
68
- except Exception as e:
69
- logger.error(f"Failed to install mpi4py: {e}")
70
- logger.error("DeepSpeed may not work correctly without mpi4py")
71
 
72
  # Disable all attention optimizations that might cause issues
73
  os.environ["TRANSFORMERS_NO_FLASH_ATTENTION"] = "1"
@@ -616,53 +602,42 @@ def train(config_path, dataset_name, output_dir):
616
  per_device_train_batch_size = 4 if gpu_count >= 4 else 2
617
  logger.info(f"Using batch size: {per_device_train_batch_size} per device (effective batch size: {per_device_train_batch_size * gpu_count * training_config.get('gradient_accumulation_steps', 4)})")
618
 
619
- # Check if DeepSpeed config is available
620
  deepspeed_config = config.get("deepspeed_config", None)
621
- if deepspeed_config:
622
  logger.info("DeepSpeed configuration found - enabling DeepSpeed for distributed training")
623
 
624
- # Check if mpi4py is available
625
- mpi4py_available = False
626
- try:
627
- import mpi4py
628
- mpi4py_available = True
629
- except ImportError:
630
- logger.error("mpi4py is required for DeepSpeed but not available")
631
- logger.error("Will continue without DeepSpeed")
632
 
633
- if mpi4py_available:
634
- try:
635
- # Create a temporary DeepSpeed config file
636
- ds_config_path = os.path.join(output_dir, "ds_config_temp.json")
637
-
638
- # Update DeepSpeed config with dynamic values
639
- if isinstance(deepspeed_config.get("train_micro_batch_size_per_gpu"), str) and deepspeed_config.get("train_micro_batch_size_per_gpu") == "auto":
640
- deepspeed_config["train_micro_batch_size_per_gpu"] = per_device_train_batch_size
641
-
642
- if isinstance(deepspeed_config.get("train_batch_size"), str) and deepspeed_config.get("train_batch_size") == "auto":
643
- deepspeed_config["train_batch_size"] = per_device_train_batch_size * gpu_count
644
-
645
- # Write the DeepSpeed config to a file
646
- with open(ds_config_path, 'w') as f:
647
- json.dump(deepspeed_config, f, indent=2)
648
-
649
- logger.info(f"Created DeepSpeed config at {ds_config_path}")
650
- logger.info(f"DeepSpeed ZeRO Stage: {deepspeed_config.get('zero_optimization', {}).get('stage', 'Not specified')}")
651
-
652
- # Enable CPU offloading if configured
653
- if deepspeed_config.get("zero_optimization", {}).get("offload_optimizer", {}).get("device") == "cpu":
654
- logger.info("DeepSpeed CPU offloading enabled for optimizer states")
655
-
656
- # Set using_deepspeed flag
657
- using_deepspeed = True
658
- except Exception as e:
659
- logger.error(f"Failed to initialize DeepSpeed: {e}")
660
- logger.error("Will continue without DeepSpeed")
661
- ds_config_path = None
662
- using_deepspeed = False
663
- else:
664
- ds_config_path = None
665
- using_deepspeed = False
666
  else:
667
  logger.warning("No DeepSpeed configuration found - continuing without DeepSpeed")
668
  ds_config_path = None
@@ -709,7 +684,6 @@ def train(config_path, dataset_name, output_dir):
709
  reports = ["none"]
710
  logger.warning("No reporting backends available - training metrics won't be logged")
711
 
712
- # Prepare training arguments
713
  training_args_dict = {
714
  "output_dir": output_dir,
715
  "num_train_epochs": training_config.get("num_train_epochs", 3),
@@ -734,20 +708,23 @@ def train(config_path, dataset_name, output_dir):
734
  "dataloader_num_workers": 4, # Use multiple workers for data loading
735
  }
736
 
737
- # Add DeepSpeed config if available and mpi4py is installed
738
- if using_deepspeed and ds_config_path is not None:
739
- logger.info("Using DeepSpeed for training")
740
  training_args_dict["deepspeed"] = ds_config_path
741
  else:
742
- logger.info("Not using DeepSpeed - falling back to standard distributed training")
743
- # If DeepSpeed is not available, ensure we're still using distributed training efficiently
744
- if gpu_count > 1:
745
- logger.info(f"Using standard distributed training with {gpu_count} GPUs")
746
- training_args_dict["local_rank"] = int(os.environ.get("LOCAL_RANK", -1))
747
- training_args_dict["gradient_checkpointing"] = True
748
 
749
  # Create TrainingArguments with validated parameters
750
- training_args = TrainingArguments(**training_args_dict)
 
 
 
 
 
 
 
 
751
 
752
  # Create trainer with pre-tokenized collator
753
  trainer = Trainer(
 
28
  from peft import LoraConfig
29
  from unsloth import FastLanguageModel
30
 
31
+ # Set DeepSpeed environment variables to disable MPI
32
+ os.environ["MASTER_ADDR"] = "localhost"
33
+ os.environ["MASTER_PORT"] = "9994"
34
+ os.environ["RANK"] = "0"
35
+ os.environ["LOCAL_RANK"] = "0"
36
+ os.environ["WORLD_SIZE"] = "1"
 
 
 
 
 
 
 
37
 
38
+ # Try to import deepspeed, install mpi4py if needed
39
  try:
40
+ import deepspeed
41
+ except ImportError as e:
42
+ if "mpi4py" in str(e):
43
+ logger.warning("mpi4py not found, installing...")
 
44
  import subprocess
45
+ try:
46
+ subprocess.check_call([sys.executable, "-m", "pip", "install", "mpi4py"])
47
+ import deepspeed
48
+ logger.info("Successfully installed mpi4py and imported deepspeed")
49
+ except Exception as install_error:
50
+ logger.warning(f"Failed to install mpi4py: {install_error}")
51
+ logger.warning("Continuing without DeepSpeed MPI support")
52
+ # Set a flag to disable DeepSpeed later
53
+ os.environ["DISABLE_DEEPSPEED_MPI"] = "1"
54
+ else:
55
+ logger.error(f"Failed to import deepspeed: {e}")
56
+ raise
 
 
 
 
 
 
57
 
58
  # Disable all attention optimizations that might cause issues
59
  os.environ["TRANSFORMERS_NO_FLASH_ATTENTION"] = "1"
 
602
  per_device_train_batch_size = 4 if gpu_count >= 4 else 2
603
  logger.info(f"Using batch size: {per_device_train_batch_size} per device (effective batch size: {per_device_train_batch_size * gpu_count * training_config.get('gradient_accumulation_steps', 4)})")
604
 
605
+ # Check if DeepSpeed config is available and if MPI is disabled
606
  deepspeed_config = config.get("deepspeed_config", None)
607
+ if deepspeed_config and os.environ.get("DISABLE_DEEPSPEED_MPI", "0") != "1":
608
  logger.info("DeepSpeed configuration found - enabling DeepSpeed for distributed training")
609
 
610
+ # Create a temporary DeepSpeed config file
611
+ ds_config_path = os.path.join(output_dir, "ds_config_temp.json")
612
+
613
+ # Update DeepSpeed config with dynamic values
614
+ if isinstance(deepspeed_config.get("train_micro_batch_size_per_gpu"), str) and deepspeed_config.get("train_micro_batch_size_per_gpu") == "auto":
615
+ deepspeed_config["train_micro_batch_size_per_gpu"] = per_device_train_batch_size
 
 
616
 
617
+ if isinstance(deepspeed_config.get("train_batch_size"), str) and deepspeed_config.get("train_batch_size") == "auto":
618
+ deepspeed_config["train_batch_size"] = per_device_train_batch_size * gpu_count
619
+
620
+ # Ensure communication backend is set to avoid MPI
621
+ if "communication_data_type" not in deepspeed_config:
622
+ deepspeed_config["communication_data_type"] = "fp16"
623
+
624
+ # Write the DeepSpeed config to a file
625
+ with open(ds_config_path, 'w') as f:
626
+ json.dump(deepspeed_config, f, indent=2)
627
+
628
+ logger.info(f"Created DeepSpeed config at {ds_config_path}")
629
+ logger.info(f"DeepSpeed ZeRO Stage: {deepspeed_config.get('zero_optimization', {}).get('stage', 'Not specified')}")
630
+
631
+ # Enable CPU offloading if configured
632
+ if deepspeed_config.get("zero_optimization", {}).get("offload_optimizer", {}).get("device") == "cpu":
633
+ logger.info("DeepSpeed CPU offloading enabled for optimizer states")
634
+
635
+ # Set using_deepspeed flag
636
+ using_deepspeed = True
637
+ elif os.environ.get("DISABLE_DEEPSPEED_MPI", "0") == "1":
638
+ logger.warning("DeepSpeed MPI support is disabled due to missing mpi4py. Continuing without DeepSpeed.")
639
+ ds_config_path = None
640
+ using_deepspeed = False
 
 
 
 
 
 
 
 
 
641
  else:
642
  logger.warning("No DeepSpeed configuration found - continuing without DeepSpeed")
643
  ds_config_path = None
 
684
  reports = ["none"]
685
  logger.warning("No reporting backends available - training metrics won't be logged")
686
 
 
687
  training_args_dict = {
688
  "output_dir": output_dir,
689
  "num_train_epochs": training_config.get("num_train_epochs", 3),
 
708
  "dataloader_num_workers": 4, # Use multiple workers for data loading
709
  }
710
 
711
+ # Add DeepSpeed config path if available and enabled
712
+ if using_deepspeed and ds_config_path:
713
+ logger.info("Adding DeepSpeed configuration to training arguments")
714
  training_args_dict["deepspeed"] = ds_config_path
715
  else:
716
+ logger.info("DeepSpeed is disabled - using standard distributed training")
 
 
 
 
 
717
 
718
  # Create TrainingArguments with validated parameters
719
+ try:
720
+ training_args = TrainingArguments(**training_args_dict)
721
+ except Exception as e:
722
+ logger.error(f"Failed to create training arguments with DeepSpeed: {e}")
723
+ if "deepspeed" in training_args_dict:
724
+ logger.warning("Removing DeepSpeed configuration and trying again")
725
+ del training_args_dict["deepspeed"]
726
+ training_args = TrainingArguments(**training_args_dict)
727
+ using_deepspeed = False
728
 
729
  # Create trainer with pre-tokenized collator
730
  trainer = Trainer(