|
import os |
|
import json |
|
import pandas as pd |
|
import numpy as np |
|
from typing import List, Dict, Tuple, Optional, Any |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torch.utils.data import Dataset, DataLoader |
|
from sklearn.model_selection import train_test_split |
|
from stable_baselines3 import PPO |
|
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv |
|
from stable_baselines3.common.utils import set_random_seed |
|
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor |
|
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback |
|
import gymnasium as gym |
|
from gymnasium import spaces |
|
from dataclasses import dataclass |
|
import logging |
|
import random |
|
from tqdm import tqdm |
|
import time |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
from datetime import datetime |
|
import argparse |
|
import psutil |
|
import gc |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler("sales_training.log"), |
|
logging.StreamHandler() |
|
] |
|
) |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
device = torch.device("cuda") |
|
logger.info(f"Using GPU: {torch.cuda.get_device_name(0)}") |
|
else: |
|
device = torch.device("cpu") |
|
logger.info("GPU not available, using CPU") |
|
|
|
@dataclass |
|
class ConversationState: |
|
"""Represents the state of a sales conversation for the RL environment.""" |
|
conversation_history: List[Dict[str, str]] |
|
embedding: np.ndarray |
|
conversation_metrics: Dict[str, float] |
|
turn_number: int |
|
conversion_probabilities: List[float] |
|
|
|
@property |
|
def state_vector(self) -> np.ndarray: |
|
"""Create a flat vector representation of the conversation state.""" |
|
|
|
metric_values = np.array(list(self.conversation_metrics.values()), dtype=np.float32) |
|
turn_info = np.array([self.turn_number], dtype=np.float32) |
|
prob_history = np.array(self.conversion_probabilities, dtype=np.float32) |
|
|
|
|
|
padded_probs = np.zeros(10, dtype=np.float32) |
|
padded_probs[:len(prob_history)] = prob_history[-10:] if len(prob_history) > 10 else prob_history |
|
|
|
return np.concatenate([ |
|
self.embedding, |
|
metric_values, |
|
turn_info, |
|
padded_probs |
|
]) |
|
|
|
|
|
class CustomLN(BaseFeaturesExtractor): |
|
"""Custom feature extractor for the embedding vector using linear layers.""" |
|
|
|
def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 128): |
|
super().__init__(observation_space, features_dim) |
|
|
|
|
|
n_input_channels = observation_space.shape[0] |
|
|
|
|
|
self.linear_network = nn.Sequential( |
|
nn.Linear(n_input_channels, 512), |
|
nn.ReLU(), |
|
nn.Linear(512, 256), |
|
nn.ReLU(), |
|
nn.Linear(256, features_dim), |
|
nn.ReLU(), |
|
).to(device) |
|
|
|
def forward(self, observations: torch.Tensor) -> torch.Tensor: |
|
return self.linear_network(observations) |
|
|
|
class SalesConversionEnv(gym.Env): |
|
"""Reinforcement learning environment for sales conversation prediction.""" |
|
|
|
def __init__(self, conversations_df: pd.DataFrame, use_miniembeddings=True): |
|
""" |
|
Initialize the environment. |
|
|
|
Args: |
|
conversations_df: DataFrame containing sales conversations |
|
use_miniembeddings: If True, reduce embedding dimension to save memory |
|
""" |
|
super().__init__() |
|
|
|
self.conversations_df = conversations_df |
|
self.current_conversation_idx = 0 |
|
self.max_turns = 20 |
|
self.use_miniembeddings = use_miniembeddings |
|
|
|
|
|
embedding_cols = [col for col in conversations_df.columns if col.startswith('embedding_')] |
|
self.full_embedding_dim = len(embedding_cols) |
|
|
|
|
|
if use_miniembeddings: |
|
self.embedding_dim = min(1024, self.full_embedding_dim) |
|
logger.info(f"Using reduced embeddings: {self.full_embedding_dim} -> {self.embedding_dim}") |
|
else: |
|
self.embedding_dim = self.full_embedding_dim |
|
|
|
|
|
self.action_space = spaces.Box( |
|
low=np.array([0.0]), |
|
high=np.array([1.0]), |
|
dtype=np.float32 |
|
) |
|
|
|
|
|
self.observation_space = spaces.Box( |
|
low=-np.inf, |
|
high=np.inf, |
|
shape=(self.embedding_dim + 5 + 1 + 10,), |
|
dtype=np.float32 |
|
) |
|
|
|
self.current_turn = 0 |
|
self.conversation_state = None |
|
self.true_probabilities = None |
|
|
|
logger.info(f"Initialized SalesConversionEnv with {len(conversations_df)} conversations") |
|
|
|
def _parse_conversation(self, conversation_idx: int) -> Tuple[List[Dict[str, str]], Dict[str, float], Dict[int, float]]: |
|
"""Parse conversation data from the dataset.""" |
|
row = self.conversations_df.iloc[conversation_idx] |
|
|
|
|
|
try: |
|
messages = json.loads(row['conversation']) |
|
except (json.JSONDecodeError, TypeError) as e: |
|
|
|
messages = [ |
|
{"speaker": "customer", "message": "I'm interested in your product."}, |
|
{"speaker": "sales_rep", "message": "Thank you for your interest. How can I help?"} |
|
] |
|
|
|
|
|
metrics = { |
|
'customer_engagement': float(row.get('customer_engagement', 0.5)), |
|
'sales_effectiveness': float(row.get('sales_effectiveness', 0.5)), |
|
'conversation_length': int(row.get('conversation_length', len(messages))), |
|
'outcome': float(row.get('outcome', 0.5)), |
|
'progress': 0.0 |
|
} |
|
|
|
|
|
try: |
|
probability_trajectory = json.loads(row['probability_trajectory']) |
|
|
|
probability_trajectory = {int(k): float(v) for k, v in probability_trajectory.items()} |
|
except (json.JSONDecodeError, TypeError, KeyError) as e: |
|
|
|
if row.get('outcome', 0) == 1: |
|
probability_trajectory = {i: min(0.5 + i * 0.05, 0.95) for i in range(len(messages))} |
|
else: |
|
probability_trajectory = {i: max(0.5 - i * 0.05, 0.05) for i in range(len(messages))} |
|
|
|
return messages, metrics, probability_trajectory |
|
|
|
def _get_embedding_for_turn(self, conversation_idx: int, turn: int) -> np.ndarray: |
|
"""Get the embedding for a specific conversation at a specific turn.""" |
|
row = self.conversations_df.iloc[conversation_idx] |
|
|
|
|
|
embedding_cols = [col for col in row.index if col.startswith('embedding_')] |
|
try: |
|
embedding = row[embedding_cols].values.astype(np.float32) |
|
|
|
|
|
if np.isnan(embedding).any() or np.isinf(embedding).any(): |
|
embedding = np.zeros(len(embedding_cols), dtype=np.float32) |
|
except Exception as e: |
|
embedding = np.zeros(len(embedding_cols), dtype=np.float32) |
|
|
|
|
|
if self.use_miniembeddings and len(embedding) > self.embedding_dim: |
|
|
|
embedding = np.array([ |
|
np.mean(embedding[i:i+self.full_embedding_dim//self.embedding_dim]) |
|
for i in range(0, self.full_embedding_dim, self.full_embedding_dim//self.embedding_dim) |
|
][:self.embedding_dim]) |
|
|
|
|
|
progress = min(1.0, turn / self.max_turns) |
|
scaled_embedding = embedding * (0.6 + 0.4 * progress) |
|
|
|
return scaled_embedding |
|
|
|
def reset(self, seed=None, options=None) -> Tuple[np.ndarray, Dict]: |
|
"""Reset the environment to start a new episode.""" |
|
super().reset(seed=seed) |
|
|
|
|
|
self.current_conversation_idx = np.random.randint(0, len(self.conversations_df)) |
|
self.current_turn = 0 |
|
|
|
|
|
messages, metrics, probability_trajectory = self._parse_conversation(self.current_conversation_idx) |
|
self.true_probabilities = probability_trajectory |
|
self.max_turns = min(20, len(messages)) |
|
|
|
|
|
embedding = self._get_embedding_for_turn(self.current_conversation_idx, 0) |
|
metrics = metrics.copy() |
|
metrics['progress'] = 0.0 |
|
|
|
self.conversation_state = ConversationState( |
|
conversation_history=messages[:1] if messages else [], |
|
embedding=embedding, |
|
conversation_metrics=metrics, |
|
turn_number=0, |
|
conversion_probabilities=[self.true_probabilities.get(0, 0.5)] |
|
) |
|
|
|
return self.conversation_state.state_vector, {} |
|
|
|
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, Dict]: |
|
"""Step the environment forward by one turn.""" |
|
|
|
predicted_prob = float(action[0]) |
|
|
|
|
|
true_prob = self.true_probabilities.get(self.current_turn, 0.5) |
|
|
|
|
|
reward = 1.0 - abs(predicted_prob - true_prob) |
|
|
|
|
|
if self.current_turn == self.max_turns - 1: |
|
outcome = self.conversation_state.conversation_metrics['outcome'] |
|
|
|
if outcome == 1 and predicted_prob < 0.5: |
|
reward -= 1.0 * (0.5 - predicted_prob) |
|
elif outcome == 0 and predicted_prob > 0.5: |
|
reward -= 1.0 * (predicted_prob - 0.5) |
|
|
|
|
|
self.current_turn += 1 |
|
done = self.current_turn >= self.max_turns |
|
|
|
if not done: |
|
|
|
embedding = self._get_embedding_for_turn(self.current_conversation_idx, self.current_turn) |
|
metrics = self.conversation_state.conversation_metrics.copy() |
|
metrics['progress'] = self.current_turn / self.max_turns |
|
|
|
messages = self._parse_conversation(self.current_conversation_idx)[0] |
|
history = messages[:self.current_turn+1] if self.current_turn+1 < len(messages) else messages |
|
|
|
|
|
conv_probs = self.conversation_state.conversion_probabilities.copy() |
|
conv_probs.append(predicted_prob) |
|
|
|
self.conversation_state = ConversationState( |
|
conversation_history=history, |
|
embedding=embedding, |
|
conversation_metrics=metrics, |
|
turn_number=self.current_turn, |
|
conversion_probabilities=conv_probs |
|
) |
|
|
|
return self.conversation_state.state_vector, reward, done, False, {'true_prob': true_prob} |
|
|
|
class SalesRLTrainer: |
|
"""Trainer for the sales conversion prediction RL model.""" |
|
|
|
def __init__(self, dataset_path: str, model_save_path: str = "sales_conversion_model", |
|
use_miniembeddings: bool = True, batch_size: int = 64): |
|
""" |
|
Initialize the trainer. |
|
|
|
Args: |
|
dataset_path: Path to the sales conversation dataset |
|
model_save_path: Path to save trained model |
|
use_miniembeddings: Whether to use reduced embeddings to save memory |
|
batch_size: Batch size for training |
|
""" |
|
self.dataset_path = dataset_path |
|
self.model_save_path = model_save_path |
|
self.use_miniembeddings = use_miniembeddings |
|
self.batch_size = batch_size |
|
self.df = None |
|
self.model = None |
|
self.train_df = None |
|
self.val_df = None |
|
|
|
|
|
os.makedirs(os.path.dirname(model_save_path) if os.path.dirname(model_save_path) else ".", exist_ok=True) |
|
os.makedirs("logs", exist_ok=True) |
|
|
|
logger.info(f"Initialized SalesRLTrainer with dataset: {dataset_path}") |
|
|
|
|
|
self._log_memory_usage("Initial") |
|
|
|
def _log_memory_usage(self, step=""): |
|
"""Log current memory usage.""" |
|
process = psutil.Process(os.getpid()) |
|
cpu_mem = process.memory_info().rss / 1024 / 1024 |
|
|
|
gpu_mem = 0 |
|
if torch.cuda.is_available(): |
|
gpu_mem = torch.cuda.memory_allocated() / 1024 / 1024 |
|
|
|
logger.info(f"Memory usage [{step}] - CPU: {cpu_mem:.2f} MB, GPU: {gpu_mem:.2f} MB") |
|
|
|
def load_dataset(self, validation_split=0.1, sample_size=None): |
|
""" |
|
Load and preprocess the sales conversation dataset. |
|
|
|
Args: |
|
validation_split: Proportion of data for validation |
|
sample_size: Optional limit on dataset size to save memory |
|
""" |
|
logger.info(f"Loading dataset from {self.dataset_path}") |
|
try: |
|
|
|
chunks = [] |
|
for chunk in pd.read_csv(self.dataset_path, chunksize=10000): |
|
chunks.append(chunk) |
|
|
|
|
|
if sample_size and sum(len(c) for c in chunks) >= sample_size: |
|
break |
|
|
|
self.df = pd.concat(chunks) |
|
|
|
|
|
if sample_size and len(self.df) > sample_size: |
|
self.df = self.df.sample(sample_size, random_state=42) |
|
|
|
logger.info(f"Loaded dataset with shape: {self.df.shape}") |
|
|
|
|
|
embedding_cols = [col for col in self.df.columns if col.startswith('embedding_')] |
|
if not embedding_cols: |
|
raise ValueError("No embedding columns found in the dataset") |
|
|
|
logger.info(f"Found {len(embedding_cols)} embedding dimensions") |
|
|
|
|
|
for col in self.df.columns: |
|
if col.startswith('embedding_'): |
|
|
|
self.df[col] = self.df[col].astype(np.float32) |
|
elif col in ['outcome', 'customer_engagement', 'sales_effectiveness']: |
|
|
|
self.df[col] = self.df[col].astype(np.float32) |
|
elif col == 'conversation_length': |
|
|
|
self.df[col] = self.df[col].astype(np.int32) |
|
|
|
|
|
train_idx, val_idx = train_test_split( |
|
np.arange(len(self.df)), |
|
test_size=validation_split, |
|
random_state=42 |
|
) |
|
|
|
self.train_df = self.df.iloc[train_idx].reset_index(drop=True) |
|
self.val_df = self.df.iloc[val_idx].reset_index(drop=True) |
|
|
|
logger.info(f"Split dataset: {len(self.train_df)} training samples, {len(self.val_df)} validation samples") |
|
|
|
|
|
self._log_memory_usage("After dataset load") |
|
|
|
|
|
gc.collect() |
|
|
|
except Exception as e: |
|
logger.error(f"Error loading dataset: {str(e)}") |
|
raise |
|
|
|
def train(self, total_timesteps: int = 100000, learning_rate: float = 0.0003, n_envs: int = 1): |
|
""" |
|
Train the RL model with GPU acceleration. |
|
|
|
Args: |
|
total_timesteps: Total timesteps for training |
|
learning_rate: Learning rate for the optimizer |
|
n_envs: Number of parallel environments |
|
""" |
|
if self.train_df is None: |
|
self.load_dataset() |
|
|
|
|
|
n_envs = 1 if torch.cuda.is_available() else n_envs |
|
|
|
|
|
def make_env(df_subset): |
|
"""Create environment with a subset of data.""" |
|
def _init(): |
|
return SalesConversionEnv(df_subset, use_miniembeddings=self.use_miniembeddings) |
|
return _init |
|
|
|
|
|
if n_envs > 1: |
|
subset_size = len(self.train_df) // n_envs |
|
env_makers = [ |
|
make_env(self.train_df.iloc[i*subset_size:(i+1)*subset_size if i < n_envs-1 else len(self.train_df)]) |
|
for i in range(n_envs) |
|
] |
|
env = SubprocVecEnv(env_makers) |
|
else: |
|
env = DummyVecEnv([make_env(self.train_df)]) |
|
|
|
|
|
val_env = DummyVecEnv([make_env(self.val_df)]) |
|
|
|
|
|
policy_kwargs = dict( |
|
activation_fn=nn.ReLU, |
|
net_arch=[dict(pi=[128, 64], vf=[128, 64])], |
|
features_extractor_class=CustomLN, |
|
features_extractor_kwargs=dict(features_dim=64) |
|
) |
|
|
|
|
|
self.model = PPO( |
|
"MlpPolicy", |
|
env, |
|
policy_kwargs=policy_kwargs, |
|
learning_rate=learning_rate, |
|
n_steps=512, |
|
batch_size=self.batch_size, |
|
n_epochs=5, |
|
gamma=0.99, |
|
gae_lambda=0.95, |
|
clip_range=0.2, |
|
clip_range_vf=0.2, |
|
ent_coef=0.01, |
|
vf_coef=0.5, |
|
max_grad_norm=0.5, |
|
tensorboard_log="./logs/", |
|
verbose=1, |
|
device=device |
|
) |
|
|
|
|
|
eval_callback = EvalCallback( |
|
val_env, |
|
best_model_save_path=f"{os.path.dirname(self.model_save_path)}/best_model", |
|
log_path="./logs/", |
|
eval_freq=max(2000, total_timesteps // 20), |
|
deterministic=True, |
|
render=False |
|
) |
|
|
|
checkpoint_callback = CheckpointCallback( |
|
save_freq=max(5000, total_timesteps // 10), |
|
save_path="./logs/checkpoints/", |
|
name_prefix="sales_model", |
|
save_replay_buffer=False, |
|
save_vecnormalize=False |
|
) |
|
|
|
|
|
self._log_memory_usage("Before training") |
|
|
|
logger.info(f"Starting training for {total_timesteps} timesteps with {n_envs} environments on {device}") |
|
self.model.learn( |
|
total_timesteps=total_timesteps, |
|
callback=[eval_callback, checkpoint_callback], |
|
progress_bar=True |
|
) |
|
|
|
|
|
self.model.save(self.model_save_path) |
|
logger.info(f"Model saved to {self.model_save_path}") |
|
|
|
|
|
self._log_memory_usage("After training") |
|
|
|
|
|
env.close() |
|
val_env.close() |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
def evaluate(self, num_episodes: int = 100): |
|
"""Evaluate the trained model.""" |
|
if self.model is None: |
|
logger.info(f"Loading model from {self.model_save_path}") |
|
self.model = PPO.load(self.model_save_path, device=device) |
|
|
|
if self.val_df is None: |
|
self.load_dataset() |
|
|
|
|
|
env = SalesConversionEnv(self.val_df, use_miniembeddings=self.use_miniembeddings) |
|
|
|
logger.info(f"Evaluating model on {num_episodes} episodes") |
|
|
|
rewards = [] |
|
accuracies = [] |
|
predictions = [] |
|
true_outcomes = [] |
|
|
|
for i in tqdm(range(num_episodes), desc="Evaluating"): |
|
obs, _ = env.reset() |
|
done = False |
|
episode_reward = 0 |
|
episode_predictions = [] |
|
true_values = [] |
|
|
|
while not done: |
|
action, _ = self.model.predict(obs, deterministic=True) |
|
obs, reward, done, _, info = env.step(action) |
|
|
|
episode_reward += reward |
|
episode_predictions.append(float(action[0])) |
|
true_values.append(info['true_prob']) |
|
|
|
rewards.append(episode_reward) |
|
|
|
|
|
final_pred = episode_predictions[-1] |
|
outcome = env.conversation_state.conversation_metrics['outcome'] |
|
correct = (final_pred >= 0.5 and outcome == 1) or (final_pred < 0.5 and outcome == 0) |
|
accuracies.append(int(correct)) |
|
|
|
predictions.append(final_pred) |
|
true_outcomes.append(1 if outcome >= 0.5 else 0) |
|
|
|
mean_reward = np.mean(rewards) |
|
mean_accuracy = np.mean(accuracies) |
|
|
|
|
|
true_positives = sum(1 for p, t in zip(predictions, true_outcomes) if p >= 0.5 and t == 1) |
|
false_positives = sum(1 for p, t in zip(predictions, true_outcomes) if p >= 0.5 and t == 0) |
|
true_negatives = sum(1 for p, t in zip(predictions, true_outcomes) if p < 0.5 and t == 0) |
|
false_negatives = sum(1 for p, t in zip(predictions, true_outcomes) if p < 0.5 and t == 1) |
|
|
|
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0 |
|
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0 |
|
f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0 |
|
|
|
logger.info(f"Evaluation results:") |
|
logger.info(f"- Mean reward: {mean_reward:.4f}") |
|
logger.info(f"- Prediction accuracy: {mean_accuracy:.4f}") |
|
logger.info(f"- Precision: {precision:.4f}") |
|
logger.info(f"- Recall: {recall:.4f}") |
|
logger.info(f"- F1 Score: {f1_score:.4f}") |
|
|
|
return { |
|
'mean_reward': float(mean_reward), |
|
'accuracy': float(mean_accuracy), |
|
'precision': float(precision), |
|
'recall': float(recall), |
|
'f1_score': float(f1_score) |
|
} |
|
|
|
def main(): |
|
"""Main function to run the training pipeline.""" |
|
parser = argparse.ArgumentParser(description="Train a sales conversion prediction model") |
|
parser.add_argument("--dataset", type=str, required=True, |
|
help="Path to the dataset CSV file") |
|
parser.add_argument("--model_path", type=str, default="models/sales_conversion_model", |
|
help="Path to save the trained model") |
|
parser.add_argument("--timesteps", type=int, default=50000, |
|
help="Number of timesteps to train for") |
|
parser.add_argument("--learning_rate", type=float, default=0.0003, |
|
help="Learning rate for training") |
|
parser.add_argument("--batch_size", type=int, default=64, |
|
help="Batch size for training") |
|
parser.add_argument("--sample_size", type=int, default=None, |
|
help="Limit dataset size to save memory (e.g., 10000)") |
|
parser.add_argument("--evaluate_only", action="store_true", |
|
help="Only evaluate an existing model without training") |
|
parser.add_argument("--num_eval_episodes", type=int, default=50, |
|
help="Number of episodes for evaluation") |
|
parser.add_argument("--use_small_embedding", action="store_true", |
|
help="Use reduced embedding dimension to save memory") |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
trainer = SalesRLTrainer( |
|
dataset_path=args.dataset, |
|
model_save_path=args.model_path, |
|
use_miniembeddings=args.use_small_embedding, |
|
batch_size=args.batch_size |
|
) |
|
|
|
|
|
trainer.load_dataset(sample_size=args.sample_size) |
|
|
|
|
|
if not args.evaluate_only: |
|
trainer.train( |
|
total_timesteps=args.timesteps, |
|
learning_rate=args.learning_rate |
|
) |
|
|
|
|
|
eval_results = trainer.evaluate(num_episodes=args.num_eval_episodes) |
|
|
|
|
|
print("\nEvaluation Results:") |
|
for metric, value in eval_results.items(): |
|
print(f"- {metric}: {value:.4f}") |
|
|
|
if __name__ == "__main__": |
|
main() |