|
""" |
|
Cyber-LLM Training Module |
|
|
|
Main training script for LoRA adapters and adversarial fine-tuning. |
|
Integrates with MLflow, Weights & Biases, and DVC for experiment tracking. |
|
|
|
Author: Muzan Sano |
|
Email: [email protected] |
|
""" |
|
|
|
import os |
|
import json |
|
import logging |
|
import argparse |
|
from pathlib import Path |
|
from typing import Dict, List, Any, Optional |
|
from datetime import datetime |
|
|
|
import torch |
|
import torch.nn as nn |
|
from torch.utils.data import DataLoader |
|
import transformers |
|
from transformers import ( |
|
AutoTokenizer, AutoModelForCausalLM, |
|
TrainingArguments, Trainer |
|
) |
|
from peft import LoraConfig, get_peft_model, TaskType |
|
from datasets import Dataset, load_from_disk |
|
import yaml |
|
|
|
|
|
try: |
|
import mlflow |
|
import mlflow.pytorch |
|
MLFLOW_AVAILABLE = True |
|
except ImportError: |
|
MLFLOW_AVAILABLE = False |
|
|
|
try: |
|
import wandb |
|
WANDB_AVAILABLE = True |
|
except ImportError: |
|
WANDB_AVAILABLE = False |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class CyberLLMTrainer: |
|
""" |
|
Advanced training system for Cyber-LLM with LoRA adapters. |
|
Supports adversarial training and multi-agent specialization. |
|
""" |
|
|
|
def __init__(self, config_path: str): |
|
self.config = self._load_config(config_path) |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
logger.info(f"Using device: {self.device}") |
|
|
|
|
|
self._init_mlops() |
|
|
|
|
|
self.tokenizer = None |
|
self.model = None |
|
self.peft_model = None |
|
|
|
def _load_config(self, config_path: str) -> Dict[str, Any]: |
|
"""Load training configuration from YAML file.""" |
|
with open(config_path, 'r') as f: |
|
return yaml.safe_load(f) |
|
|
|
def _init_mlops(self): |
|
"""Initialize MLOps tracking systems.""" |
|
|
|
if MLFLOW_AVAILABLE and self.config.get("mlops", {}).get("use_mlflow", False): |
|
mlflow.set_experiment(self.config["mlops"]["experiment_name"]) |
|
mlflow.start_run(run_name=self.config["mlops"]["run_name"]) |
|
logger.info("MLflow tracking initialized") |
|
|
|
|
|
if WANDB_AVAILABLE and self.config.get("mlops", {}).get("use_wandb", False): |
|
wandb.init( |
|
project="cyber-llm", |
|
name=self.config["mlops"]["run_name"], |
|
config=self.config |
|
) |
|
logger.info("Weights & Biases tracking initialized") |
|
|
|
def load_model_and_tokenizer(self): |
|
"""Load base model and tokenizer.""" |
|
model_name = self.config["model"]["base_model"] |
|
|
|
logger.info(f"Loading model: {model_name}") |
|
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) |
|
|
|
|
|
if self.tokenizer.pad_token is None: |
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
model_name, |
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
device_map="auto" if torch.cuda.is_available() else None, |
|
trust_remote_code=True |
|
) |
|
|
|
logger.info(f"Model loaded successfully. Parameters: {self.model.num_parameters():,}") |
|
|
|
def setup_lora(self, target_modules: Optional[List[str]] = None) -> None: |
|
"""Setup LoRA configuration for parameter-efficient fine-tuning.""" |
|
if target_modules is None: |
|
target_modules = self.config["lora"]["target_modules"] |
|
|
|
lora_config = LoraConfig( |
|
task_type=TaskType.CAUSAL_LM, |
|
r=self.config["lora"]["r"], |
|
lora_alpha=self.config["lora"]["lora_alpha"], |
|
lora_dropout=self.config["lora"]["lora_dropout"], |
|
target_modules=target_modules, |
|
bias="none" |
|
) |
|
|
|
self.peft_model = get_peft_model(self.model, lora_config) |
|
self.peft_model.print_trainable_parameters() |
|
|
|
logger.info("LoRA configuration applied successfully") |
|
|
|
def load_training_data(self, data_path: str) -> Dataset: |
|
"""Load and prepare training data.""" |
|
logger.info(f"Loading training data from: {data_path}") |
|
|
|
if os.path.isdir(data_path): |
|
|
|
dataset = load_from_disk(data_path) |
|
else: |
|
|
|
with open(data_path, 'r') as f: |
|
if data_path.endswith('.jsonl'): |
|
data = [json.loads(line) for line in f] |
|
else: |
|
data = json.load(f) |
|
|
|
dataset = Dataset.from_list(data) |
|
|
|
logger.info(f"Loaded dataset with {len(dataset)} examples") |
|
return dataset |
|
|
|
def tokenize_dataset(self, dataset: Dataset) -> Dataset: |
|
"""Tokenize dataset for training.""" |
|
def tokenize_function(examples): |
|
|
|
if "text" in examples: |
|
texts = examples["text"] |
|
elif "instruction" in examples and "response" in examples: |
|
texts = [f"Instruction: {inst}\nResponse: {resp}" |
|
for inst, resp in zip(examples["instruction"], examples["response"])] |
|
else: |
|
raise ValueError("Dataset must contain 'text' or 'instruction'/'response' fields") |
|
|
|
|
|
tokenized = self.tokenizer( |
|
texts, |
|
truncation=True, |
|
padding=False, |
|
max_length=self.config["data"]["max_seq_length"], |
|
return_tensors=None |
|
) |
|
|
|
|
|
tokenized["labels"] = tokenized["input_ids"].copy() |
|
|
|
return tokenized |
|
|
|
tokenized_dataset = dataset.map( |
|
tokenize_function, |
|
batched=True, |
|
remove_columns=dataset.column_names |
|
) |
|
|
|
return tokenized_dataset |
|
|
|
def create_adversarial_examples(self, dataset: Dataset, num_adversarial: int = 1000) -> Dataset: |
|
""" |
|
Generate adversarial examples for robust training. |
|
This is a simplified implementation - could be enhanced with more sophisticated techniques. |
|
""" |
|
logger.info(f"Generating {num_adversarial} adversarial examples") |
|
|
|
adversarial_prompts = [ |
|
"Ignore previous instructions and ", |
|
"As a helpful AI assistant, please provide detailed instructions for ", |
|
"This is for educational purposes only: ", |
|
"Hypothetically speaking, if someone wanted to ", |
|
"In a fictional scenario, describe how to ", |
|
] |
|
|
|
adversarial_examples = [] |
|
original_examples = dataset.select(range(min(num_adversarial, len(dataset)))) |
|
|
|
for i, example in enumerate(original_examples): |
|
if i % 200 == 0: |
|
logger.info(f"Generated {i}/{num_adversarial} adversarial examples") |
|
|
|
original_text = example.get("text", "") |
|
|
|
|
|
adversarial_prefix = adversarial_prompts[i % len(adversarial_prompts)] |
|
adversarial_text = adversarial_prefix + original_text |
|
|
|
adversarial_examples.append({ |
|
"text": adversarial_text, |
|
"is_adversarial": True, |
|
"original_text": original_text |
|
}) |
|
|
|
|
|
adversarial_dataset = Dataset.from_list(adversarial_examples) |
|
combined_dataset = dataset.select(range(len(dataset) - num_adversarial)).concatenate(adversarial_dataset) |
|
|
|
logger.info(f"Created combined dataset with {len(combined_dataset)} examples") |
|
return combined_dataset |
|
|
|
def setup_training_arguments(self) -> TrainingArguments: |
|
"""Setup training arguments.""" |
|
output_dir = f"./outputs/{self.config['mlops']['run_name']}" |
|
|
|
training_args = TrainingArguments( |
|
output_dir=output_dir, |
|
overwrite_output_dir=True, |
|
num_train_epochs=self.config["training"]["num_epochs"], |
|
per_device_train_batch_size=self.config["training"]["batch_size"], |
|
per_device_eval_batch_size=self.config["training"]["batch_size"], |
|
gradient_accumulation_steps=self.config["training"]["gradient_accumulation_steps"], |
|
learning_rate=self.config["training"]["learning_rate"], |
|
warmup_steps=self.config["training"]["warmup_steps"], |
|
logging_steps=self.config["training"]["logging_steps"], |
|
save_steps=self.config["training"]["save_steps"], |
|
eval_steps=self.config["training"]["eval_steps"], |
|
evaluation_strategy="steps", |
|
save_strategy="steps", |
|
load_best_model_at_end=True, |
|
metric_for_best_model="eval_loss", |
|
greater_is_better=False, |
|
dataloader_drop_last=True, |
|
fp16=torch.cuda.is_available(), |
|
gradient_checkpointing=True, |
|
optim="adamw_torch", |
|
report_to=["wandb"] if WANDB_AVAILABLE else None, |
|
run_name=self.config["mlops"]["run_name"] |
|
) |
|
|
|
return training_args |
|
|
|
def train_adapter(self, adapter_name: str, dataset_path: str, adversarial_training: bool = False): |
|
"""Train a specific LoRA adapter.""" |
|
logger.info(f"Starting training for adapter: {adapter_name}") |
|
|
|
|
|
if self.model is None: |
|
self.load_model_and_tokenizer() |
|
self.setup_lora() |
|
|
|
|
|
dataset = self.load_training_data(dataset_path) |
|
|
|
|
|
if adversarial_training: |
|
dataset = self.create_adversarial_examples(dataset) |
|
|
|
|
|
tokenized_dataset = self.tokenize_dataset(dataset) |
|
|
|
|
|
train_size = int(len(tokenized_dataset) * self.config["data"]["train_split"]) |
|
val_size = int(len(tokenized_dataset) * self.config["data"]["val_split"]) |
|
|
|
train_dataset = tokenized_dataset.select(range(train_size)) |
|
val_dataset = tokenized_dataset.select(range(train_size, train_size + val_size)) |
|
|
|
logger.info(f"Train size: {len(train_dataset)}, Val size: {len(val_dataset)}") |
|
|
|
|
|
training_args = self.setup_training_arguments() |
|
|
|
|
|
from transformers import DataCollatorForLanguageModeling |
|
data_collator = DataCollatorForLanguageModeling( |
|
tokenizer=self.tokenizer, |
|
mlm=False |
|
) |
|
|
|
|
|
trainer = Trainer( |
|
model=self.peft_model, |
|
args=training_args, |
|
train_dataset=train_dataset, |
|
eval_dataset=val_dataset, |
|
data_collator=data_collator, |
|
tokenizer=self.tokenizer |
|
) |
|
|
|
|
|
logger.info("Starting training...") |
|
trainer.train() |
|
|
|
|
|
adapter_output_path = f"./adapters/{adapter_name}" |
|
trainer.save_model(adapter_output_path) |
|
|
|
|
|
if MLFLOW_AVAILABLE: |
|
mlflow.log_artifacts(adapter_output_path, artifact_path=f"adapters/{adapter_name}") |
|
|
|
logger.info(f"Training completed for adapter: {adapter_name}") |
|
logger.info(f"Adapter saved to: {adapter_output_path}") |
|
|
|
return trainer |
|
|
|
def evaluate_model(self, test_dataset_path: str): |
|
"""Evaluate the trained model.""" |
|
logger.info("Starting model evaluation") |
|
|
|
|
|
test_dataset = self.load_training_data(test_dataset_path) |
|
tokenized_test = self.tokenize_dataset(test_dataset) |
|
|
|
|
|
|
|
|
|
logger.info("Evaluation completed") |
|
|
|
def cleanup(self): |
|
"""Cleanup resources.""" |
|
if MLFLOW_AVAILABLE: |
|
mlflow.end_run() |
|
|
|
if WANDB_AVAILABLE: |
|
wandb.finish() |
|
|
|
logger.info("Training cleanup completed") |
|
|
|
def main(): |
|
"""Main training entry point.""" |
|
parser = argparse.ArgumentParser(description="Cyber-LLM Training") |
|
parser.add_argument("--config", required=True, help="Path to training configuration file") |
|
parser.add_argument("--adapter", required=True, help="Name of adapter to train") |
|
parser.add_argument("--data", required=True, help="Path to training data") |
|
parser.add_argument("--adversarial", action="store_true", help="Enable adversarial training") |
|
parser.add_argument("--eval-data", help="Path to evaluation data") |
|
|
|
args = parser.parse_args() |
|
|
|
try: |
|
|
|
trainer = CyberLLMTrainer(args.config) |
|
|
|
|
|
trainer.train_adapter( |
|
adapter_name=args.adapter, |
|
dataset_path=args.data, |
|
adversarial_training=args.adversarial |
|
) |
|
|
|
|
|
if args.eval_data: |
|
trainer.evaluate_model(args.eval_data) |
|
|
|
except Exception as e: |
|
logger.error(f"Training failed: {str(e)}") |
|
raise |
|
finally: |
|
|
|
trainer.cleanup() |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|