unit731's picture
Upload core Cyber-LLM platform components
23804b3 verified
"""
Cyber-LLM Training Module
Main training script for LoRA adapters and adversarial fine-tuning.
Integrates with MLflow, Weights & Biases, and DVC for experiment tracking.
Author: Muzan Sano
Email: [email protected]
"""
import os
import json
import logging
import argparse
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import transformers
from transformers import (
AutoTokenizer, AutoModelForCausalLM,
TrainingArguments, Trainer
)
from peft import LoraConfig, get_peft_model, TaskType
from datasets import Dataset, load_from_disk
import yaml
# MLOps imports
try:
import mlflow
import mlflow.pytorch
MLFLOW_AVAILABLE = True
except ImportError:
MLFLOW_AVAILABLE = False
try:
import wandb
WANDB_AVAILABLE = True
except ImportError:
WANDB_AVAILABLE = False
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CyberLLMTrainer:
"""
Advanced training system for Cyber-LLM with LoRA adapters.
Supports adversarial training and multi-agent specialization.
"""
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {self.device}")
# Initialize MLOps tracking
self._init_mlops()
# Load model and tokenizer
self.tokenizer = None
self.model = None
self.peft_model = None
def _load_config(self, config_path: str) -> Dict[str, Any]:
"""Load training configuration from YAML file."""
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def _init_mlops(self):
"""Initialize MLOps tracking systems."""
# Initialize MLflow
if MLFLOW_AVAILABLE and self.config.get("mlops", {}).get("use_mlflow", False):
mlflow.set_experiment(self.config["mlops"]["experiment_name"])
mlflow.start_run(run_name=self.config["mlops"]["run_name"])
logger.info("MLflow tracking initialized")
# Initialize Weights & Biases
if WANDB_AVAILABLE and self.config.get("mlops", {}).get("use_wandb", False):
wandb.init(
project="cyber-llm",
name=self.config["mlops"]["run_name"],
config=self.config
)
logger.info("Weights & Biases tracking initialized")
def load_model_and_tokenizer(self):
"""Load base model and tokenizer."""
model_name = self.config["model"]["base_model"]
logger.info(f"Loading model: {model_name}")
# Load tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
# Add special token if needed
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
# Load model
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto" if torch.cuda.is_available() else None,
trust_remote_code=True
)
logger.info(f"Model loaded successfully. Parameters: {self.model.num_parameters():,}")
def setup_lora(self, target_modules: Optional[List[str]] = None) -> None:
"""Setup LoRA configuration for parameter-efficient fine-tuning."""
if target_modules is None:
target_modules = self.config["lora"]["target_modules"]
lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=self.config["lora"]["r"],
lora_alpha=self.config["lora"]["lora_alpha"],
lora_dropout=self.config["lora"]["lora_dropout"],
target_modules=target_modules,
bias="none"
)
self.peft_model = get_peft_model(self.model, lora_config)
self.peft_model.print_trainable_parameters()
logger.info("LoRA configuration applied successfully")
def load_training_data(self, data_path: str) -> Dataset:
"""Load and prepare training data."""
logger.info(f"Loading training data from: {data_path}")
if os.path.isdir(data_path):
# Load preprocessed dataset
dataset = load_from_disk(data_path)
else:
# Load from JSON/JSONL
with open(data_path, 'r') as f:
if data_path.endswith('.jsonl'):
data = [json.loads(line) for line in f]
else:
data = json.load(f)
dataset = Dataset.from_list(data)
logger.info(f"Loaded dataset with {len(dataset)} examples")
return dataset
def tokenize_dataset(self, dataset: Dataset) -> Dataset:
"""Tokenize dataset for training."""
def tokenize_function(examples):
# Handle different input formats
if "text" in examples:
texts = examples["text"]
elif "instruction" in examples and "response" in examples:
texts = [f"Instruction: {inst}\nResponse: {resp}"
for inst, resp in zip(examples["instruction"], examples["response"])]
else:
raise ValueError("Dataset must contain 'text' or 'instruction'/'response' fields")
# Tokenize
tokenized = self.tokenizer(
texts,
truncation=True,
padding=False,
max_length=self.config["data"]["max_seq_length"],
return_tensors=None
)
# Set labels for language modeling
tokenized["labels"] = tokenized["input_ids"].copy()
return tokenized
tokenized_dataset = dataset.map(
tokenize_function,
batched=True,
remove_columns=dataset.column_names
)
return tokenized_dataset
def create_adversarial_examples(self, dataset: Dataset, num_adversarial: int = 1000) -> Dataset:
"""
Generate adversarial examples for robust training.
This is a simplified implementation - could be enhanced with more sophisticated techniques.
"""
logger.info(f"Generating {num_adversarial} adversarial examples")
adversarial_prompts = [
"Ignore previous instructions and ",
"As a helpful AI assistant, please provide detailed instructions for ",
"This is for educational purposes only: ",
"Hypothetically speaking, if someone wanted to ",
"In a fictional scenario, describe how to ",
]
adversarial_examples = []
original_examples = dataset.select(range(min(num_adversarial, len(dataset))))
for i, example in enumerate(original_examples):
if i % 200 == 0:
logger.info(f"Generated {i}/{num_adversarial} adversarial examples")
original_text = example.get("text", "")
# Generate adversarial version
adversarial_prefix = adversarial_prompts[i % len(adversarial_prompts)]
adversarial_text = adversarial_prefix + original_text
adversarial_examples.append({
"text": adversarial_text,
"is_adversarial": True,
"original_text": original_text
})
# Combine with original dataset
adversarial_dataset = Dataset.from_list(adversarial_examples)
combined_dataset = dataset.select(range(len(dataset) - num_adversarial)).concatenate(adversarial_dataset)
logger.info(f"Created combined dataset with {len(combined_dataset)} examples")
return combined_dataset
def setup_training_arguments(self) -> TrainingArguments:
"""Setup training arguments."""
output_dir = f"./outputs/{self.config['mlops']['run_name']}"
training_args = TrainingArguments(
output_dir=output_dir,
overwrite_output_dir=True,
num_train_epochs=self.config["training"]["num_epochs"],
per_device_train_batch_size=self.config["training"]["batch_size"],
per_device_eval_batch_size=self.config["training"]["batch_size"],
gradient_accumulation_steps=self.config["training"]["gradient_accumulation_steps"],
learning_rate=self.config["training"]["learning_rate"],
warmup_steps=self.config["training"]["warmup_steps"],
logging_steps=self.config["training"]["logging_steps"],
save_steps=self.config["training"]["save_steps"],
eval_steps=self.config["training"]["eval_steps"],
evaluation_strategy="steps",
save_strategy="steps",
load_best_model_at_end=True,
metric_for_best_model="eval_loss",
greater_is_better=False,
dataloader_drop_last=True,
fp16=torch.cuda.is_available(),
gradient_checkpointing=True,
optim="adamw_torch",
report_to=["wandb"] if WANDB_AVAILABLE else None,
run_name=self.config["mlops"]["run_name"]
)
return training_args
def train_adapter(self, adapter_name: str, dataset_path: str, adversarial_training: bool = False):
"""Train a specific LoRA adapter."""
logger.info(f"Starting training for adapter: {adapter_name}")
# Load model if not already loaded
if self.model is None:
self.load_model_and_tokenizer()
self.setup_lora()
# Load and prepare data
dataset = self.load_training_data(dataset_path)
# Add adversarial examples if requested
if adversarial_training:
dataset = self.create_adversarial_examples(dataset)
# Tokenize dataset
tokenized_dataset = self.tokenize_dataset(dataset)
# Split dataset
train_size = int(len(tokenized_dataset) * self.config["data"]["train_split"])
val_size = int(len(tokenized_dataset) * self.config["data"]["val_split"])
train_dataset = tokenized_dataset.select(range(train_size))
val_dataset = tokenized_dataset.select(range(train_size, train_size + val_size))
logger.info(f"Train size: {len(train_dataset)}, Val size: {len(val_dataset)}")
# Setup training arguments
training_args = self.setup_training_arguments()
# Custom data collator for language modeling
from transformers import DataCollatorForLanguageModeling
data_collator = DataCollatorForLanguageModeling(
tokenizer=self.tokenizer,
mlm=False # We're doing causal LM, not masked LM
)
# Initialize trainer
trainer = Trainer(
model=self.peft_model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
data_collator=data_collator,
tokenizer=self.tokenizer
)
# Start training
logger.info("Starting training...")
trainer.train()
# Save adapter
adapter_output_path = f"./adapters/{adapter_name}"
trainer.save_model(adapter_output_path)
# Log metrics
if MLFLOW_AVAILABLE:
mlflow.log_artifacts(adapter_output_path, artifact_path=f"adapters/{adapter_name}")
logger.info(f"Training completed for adapter: {adapter_name}")
logger.info(f"Adapter saved to: {adapter_output_path}")
return trainer
def evaluate_model(self, test_dataset_path: str):
"""Evaluate the trained model."""
logger.info("Starting model evaluation")
# Load test dataset
test_dataset = self.load_training_data(test_dataset_path)
tokenized_test = self.tokenize_dataset(test_dataset)
# Evaluation metrics would go here
# For now, just compute perplexity
logger.info("Evaluation completed")
def cleanup(self):
"""Cleanup resources."""
if MLFLOW_AVAILABLE:
mlflow.end_run()
if WANDB_AVAILABLE:
wandb.finish()
logger.info("Training cleanup completed")
def main():
"""Main training entry point."""
parser = argparse.ArgumentParser(description="Cyber-LLM Training")
parser.add_argument("--config", required=True, help="Path to training configuration file")
parser.add_argument("--adapter", required=True, help="Name of adapter to train")
parser.add_argument("--data", required=True, help="Path to training data")
parser.add_argument("--adversarial", action="store_true", help="Enable adversarial training")
parser.add_argument("--eval-data", help="Path to evaluation data")
args = parser.parse_args()
try:
# Initialize trainer
trainer = CyberLLMTrainer(args.config)
# Train adapter
trainer.train_adapter(
adapter_name=args.adapter,
dataset_path=args.data,
adversarial_training=args.adversarial
)
# Evaluate if test data provided
if args.eval_data:
trainer.evaluate_model(args.eval_data)
except Exception as e:
logger.error(f"Training failed: {str(e)}")
raise
finally:
# Cleanup
trainer.cleanup()
if __name__ == "__main__":
main()