File size: 8,119 Bytes
f7b283c 71ca212 f7b283c 71ca212 f7b283c 71ca212 f7b283c 71ca212 f7b283c 9decf80 71ca212 9decf80 f7b283c 9decf80 f7b283c 9decf80 f7b283c 9decf80 f7b283c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
from typing import Dict, Optional, Tuple
from pathlib import Path
import tensorflow as tf
import os
import subprocess
from datetime import datetime
from logger_config import config_logger
logger = config_logger(__name__)
class EnvironmentSetup:
def __init__(self):
self.device_type, self.strategy = self.setup_devices()
self.cache_dir = None
def initialize(self, cache_dir: Optional[Path] = None):
self.cache_dir = self.setup_model_cache(cache_dir)
self.training_dirs = self.setup_training_directories()
@staticmethod
def setup_model_cache(cache_dir: Optional[Path] = None) -> Path:
"""Setup and manage model cache directory."""
if cache_dir is None:
cache_dir = Path.home() / '.chatbot_cache'
cache_dir.mkdir(parents=True, exist_ok=True)
# Set environment variables for various libraries
os.environ['TRANSFORMERS_CACHE'] = str(cache_dir / 'transformers')
os.environ['TORCH_HOME'] = str(cache_dir / 'torch')
os.environ['HF_HOME'] = str(cache_dir / 'huggingface')
logger.info(f"Using cache directory: {cache_dir}")
return cache_dir
@staticmethod
def setup_training_directories(base_dir: str = "chatbot_training") -> Dict[str, Path]:
"""Setup directory structure for training artifacts."""
base_dir = Path(base_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
train_dir = base_dir / f"training_run_{timestamp}"
directories = {
'base': train_dir,
'checkpoints': train_dir / 'checkpoints',
'plots': train_dir / 'plots',
'logs': train_dir / 'logs'
}
# Create directories
for dir_path in directories.values():
dir_path.mkdir(parents=True, exist_ok=True)
return directories
@staticmethod
def is_colab() -> bool:
"""Check if code is running in Google Colab."""
try:
# Handle both import and attribute checks
import google.colab # type: ignore
import IPython # type: ignore
return True
except (ImportError, AttributeError):
return False
def setup_colab_tpu(self) -> Optional[tf.distribute.Strategy]:
"""Setup TPU in Colab environment if available."""
if not self.is_colab():
return None
try:
import requests
import os
# Check TPU availability
if 'COLAB_TPU_ADDR' not in os.environ:
return None
# TPU address should be set
tpu_address = 'grpc://' + os.environ['COLAB_TPU_ADDR']
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=tpu_address)
tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)
strategy = tf.distribute.TPUStrategy(resolver)
return strategy
except Exception as e:
logger.warning(f"Failed to initialize Colab TPU: {e}")
return None
def setup_devices(self) -> Tuple[str, tf.distribute.Strategy]:
"""Configure available compute devices with Colab optimizations."""
logger.info("Checking available compute devices...")
# Colab-specific setup
if self.is_colab():
logger.info("Running in Google Colab environment")
# Try TPU first in Colab
tpu_strategy = self.setup_colab_tpu()
if tpu_strategy is not None:
logger.info("Colab TPU detected and initialized")
return "TPU", tpu_strategy
# Colab GPU setup
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
# Colab-specific GPU memory management
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Get GPU info using subprocess
try:
gpu_name = subprocess.check_output(
['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'],
stderr=subprocess.DEVNULL
).decode('utf-8').strip()
logger.info(f"Colab GPU detected: {gpu_name}")
except (subprocess.SubprocessError, FileNotFoundError):
logger.warning("Could not detect specific GPU model")
strategy = tf.distribute.OneDeviceStrategy("/GPU:0")
return "GPU", strategy
except Exception as e:
logger.error(f"Error configuring Colab GPU: {str(e)}")
# Non-Colab setup
else:
# Check for TPU
try:
resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)
strategy = tf.distribute.TPUStrategy(resolver)
logger.info("TPU detected and initialized")
return "TPU", strategy
except ValueError:
logger.info("No TPU detected. Checking for GPUs...")
# Check for GPUs
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if len(gpus) > 1:
strategy = tf.distribute.MirroredStrategy()
logger.info(f"Multi-GPU strategy set up with {len(gpus)} GPUs")
else:
strategy = tf.distribute.OneDeviceStrategy("/GPU:0")
logger.info("Single GPU strategy set up")
return "GPU", strategy
except Exception as e:
logger.error(f"Error configuring GPU: {str(e)}")
# CPU fallback
strategy = tf.distribute.OneDeviceStrategy("/CPU:0")
logger.info("Using CPU strategy")
return "CPU", strategy
def optimize_batch_size(self, base_batch_size: int = 16) -> int:
"""Colab-specific optimizations for training."""
if not self.is_colab():
return base_batch_size
# Colab batch size optimization
if self.device_type == "GPU":
try:
gpu_name = subprocess.check_output(
['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'],
stderr=subprocess.DEVNULL
).decode('utf-8').strip()
if "A100" in gpu_name:
logger.info("Optimizing for Colab A100 GPU")
base_batch_size = min(base_batch_size * 8, 64)
elif "T4" in gpu_name:
logger.info("Optimizing for Colab T4 GPU")
base_batch_size = min(base_batch_size * 2, 32)
elif "V100" in gpu_name:
logger.info("Optimizing for Colab V100 GPU")
base_batch_size = min(base_batch_size * 3, 48)
except (subprocess.SubprocessError, FileNotFoundError):
logger.warning("Could not detect specific GPU model, using default settings")
elif self.device_type == "TPU":
# TPU optimizations
base_batch_size = min(base_batch_size * 4, 64)
logger.info("Optimizing for Colab TPU")
logger.info(f"Optimized batch size for Colab: {base_batch_size}")
return base_batch_size
|