|
import argparse |
|
import os |
|
import json |
|
import numpy as np |
|
import torch |
|
from typing import List, Dict |
|
from transformers import ( |
|
AutoTokenizer, |
|
AutoModel |
|
) |
|
from stable_baselines3 import PPO |
|
from llama_cpp import Llama |
|
import logging |
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
class SalesConversionPredictor: |
|
"""Sales conversion prediction class using Hugging Face models and llama.cpp""" |
|
|
|
def __init__(self, |
|
model_path: str, |
|
embedding_model_name: str = "BAAI/bge-large-en-v1.5", |
|
llm_gguf_path: str = "path/to/your/llama-3.2-1b-instruct.gguf", |
|
use_gpu: bool = True, |
|
n_gpu_layers: int = -1, |
|
n_ctx: int = 2048, |
|
use_mini_embeddings: bool = True): |
|
"""Initialize with Hugging Face embeddings and llama.cpp LLM""" |
|
|
|
|
|
self.device = torch.device("cuda" if torch.cuda.is_available() and use_gpu else "cpu") |
|
logger.info(f"Using device: {self.device}") |
|
|
|
|
|
logger.info(f"Loading embedding model: {embedding_model_name}") |
|
self.embedding_tokenizer = AutoTokenizer.from_pretrained(embedding_model_name) |
|
self.embedding_model = AutoModel.from_pretrained(embedding_model_name).to(self.device) |
|
|
|
|
|
self.use_mini_embeddings = use_mini_embeddings |
|
self.embedding_dim = 1024 |
|
|
|
|
|
logger.info(f"Loading LLM model from GGUF: {llm_gguf_path}") |
|
self.llm = Llama.from_pretrained( |
|
repo_id=llm_gguf_path, |
|
filename="*Q4_K_M.gguf", |
|
n_gpu_layers=n_gpu_layers if use_gpu else 0, |
|
n_ctx=n_ctx, |
|
verbose=False, |
|
use_mlock=True, |
|
n_threads=None |
|
) |
|
|
|
|
|
ppo_device = "cpu" |
|
logger.info(f"Loading PPO model on {ppo_device}") |
|
self.ppo_model = PPO.load(model_path, device=ppo_device) |
|
|
|
|
|
self.conversation_states = {} |
|
|
|
def _normalize_history_format(self, history: List[Dict[str, str]]) -> List[Dict[str, str]]: |
|
"""Normalize history format to ensure consistency""" |
|
normalized_history = [] |
|
|
|
for msg in history: |
|
|
|
role = msg.get('role', msg.get('speaker', '')) |
|
|
|
|
|
content = msg.get('content', msg.get('message', '')) |
|
|
|
|
|
if role in ['user', 'customer']: |
|
speaker = 'user' |
|
elif role in ['assistant', 'sales_rep']: |
|
speaker = 'sales_rep' |
|
else: |
|
speaker = role |
|
|
|
normalized_history.append({ |
|
'speaker': speaker, |
|
'message': content |
|
}) |
|
|
|
return normalized_history |
|
|
|
def get_embedding(self, text: str) -> np.ndarray: |
|
"""Get embedding for text using BAAI/bge-large-en-v1.5""" |
|
try: |
|
|
|
inputs = self.embedding_tokenizer( |
|
text, |
|
padding=True, |
|
truncation=True, |
|
return_tensors='pt', |
|
max_length=8192 |
|
).to(self.device) |
|
|
|
|
|
with torch.no_grad(): |
|
model_output = self.embedding_model(**inputs) |
|
|
|
embeddings = model_output.last_hidden_state |
|
attention_mask = inputs['attention_mask'] |
|
|
|
|
|
input_mask_expanded = attention_mask.unsqueeze(-1).expand(embeddings.size()).float() |
|
sum_embeddings = torch.sum(embeddings * input_mask_expanded, 1) |
|
sum_mask = input_mask_expanded.sum(1) |
|
|
|
|
|
sum_mask = torch.clamp(sum_mask, min=1e-9) |
|
mean_embeddings = sum_embeddings / sum_mask |
|
|
|
|
|
embeddings = torch.nn.functional.normalize(mean_embeddings, p=2, dim=1) |
|
|
|
|
|
bge_embedding = embeddings.cpu().numpy()[0].astype(np.float32) |
|
|
|
|
|
logger.info(f"BGE embedding shape: {bge_embedding.shape}") |
|
|
|
|
|
if len(bge_embedding) != 1024: |
|
logger.warning(f"Expected 1024 dimensions, got {len(bge_embedding)}") |
|
|
|
if len(bge_embedding) < 1024: |
|
padded = np.zeros(1024, dtype=np.float32) |
|
padded[:len(bge_embedding)] = bge_embedding |
|
bge_embedding = padded |
|
else: |
|
bge_embedding = bge_embedding[:1024] |
|
|
|
return bge_embedding |
|
|
|
except Exception as e: |
|
logger.error(f"Error getting embedding: {str(e)}") |
|
|
|
return np.zeros(1024, dtype=np.float32) |
|
|
|
def analyze_conversation_metrics(self, history: List[Dict[str, str]]) -> Dict[str, float]: |
|
"""Analyze conversation to extract key metrics using LLM""" |
|
try: |
|
|
|
normalized_history = self._normalize_history_format(history) |
|
|
|
|
|
conversation_text = "" |
|
for msg in normalized_history: |
|
speaker = msg.get('speaker', '') |
|
message = msg.get('message', '') |
|
conversation_text += f"{speaker}: {message}\n\n" |
|
|
|
|
|
prompt = f"""Analyze this sales conversation and rate each metric from 0.0 to 1.0: |
|
|
|
customer_engagement: |
|
sales_effectiveness: |
|
|
|
Respond only with numbers in the format shown above. |
|
|
|
Conversation: |
|
{conversation_text}""" |
|
|
|
|
|
response = self.generate_llm_response(prompt, max_new_tokens=50) |
|
print("response", response) |
|
|
|
|
|
lines = response.strip().split('\n') |
|
print("lines", lines) |
|
|
|
engagement = 0.5 |
|
effectiveness = 0.5 |
|
|
|
for line in lines: |
|
if 'customer_engagement' in line.lower(): |
|
try: |
|
engagement = float(line.split(':')[-1].strip()) |
|
|
|
engagement = max(0.0, min(1.0, engagement)) |
|
except: |
|
pass |
|
elif 'sales_effectiveness' in line.lower(): |
|
try: |
|
effectiveness = float(line.split(':')[-1].strip()) |
|
|
|
effectiveness = max(0.0, min(1.0, effectiveness)) |
|
except: |
|
pass |
|
|
|
return { |
|
'customer_engagement': engagement, |
|
'sales_effectiveness': effectiveness, |
|
'conversation_length': len(normalized_history), |
|
'outcome': 0.5, |
|
'progress': min(1.0, len(normalized_history) / 20) |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing conversation: {str(e)}") |
|
|
|
return { |
|
'customer_engagement': 0.5, |
|
'sales_effectiveness': 0.5, |
|
'conversation_length': len(history), |
|
'outcome': 0.5, |
|
'progress': min(1.0, len(history) / 20) |
|
} |
|
|
|
def generate_llm_response(self, prompt: str, max_new_tokens: int = 2048) -> str: |
|
"""Generate response using llama-cpp""" |
|
try: |
|
|
|
response = self.llm( |
|
prompt, |
|
max_tokens=max_new_tokens, |
|
temperature=0.001, |
|
top_p=0.95, |
|
repeat_penalty=1.1, |
|
stop=["User:", "Assistant:", "\n\n"] |
|
) |
|
|
|
|
|
generated_text = response['choices'][0]['text'] |
|
|
|
|
|
generated_text = generated_text.strip() |
|
|
|
return generated_text |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating LLM response: {str(e)}") |
|
return "I apologize, but I encountered an error generating a response." |
|
|
|
def create_state_vector(self, |
|
embedding: np.ndarray, |
|
metrics: Dict[str, float], |
|
turn_number: int, |
|
previous_probs: List[float]) -> np.ndarray: |
|
"""Create state vector for model input""" |
|
|
|
|
|
metric_values = np.array([ |
|
metrics['customer_engagement'], |
|
metrics['sales_effectiveness'], |
|
metrics['conversation_length'], |
|
metrics['outcome'], |
|
metrics['progress'] |
|
], dtype=np.float32) |
|
|
|
|
|
turn_info = np.array([turn_number], dtype=np.float32) |
|
|
|
|
|
padded_probs = np.zeros(10, dtype=np.float32) |
|
if previous_probs: |
|
|
|
recent_probs = previous_probs[-10:] if len(previous_probs) > 10 else previous_probs |
|
padded_probs[:len(recent_probs)] = recent_probs |
|
|
|
|
|
if len(embedding) != 1024: |
|
logger.warning(f"Unexpected embedding size: {len(embedding)}. Expected 1024. Creating zero embedding.") |
|
embedding = np.zeros(1024, dtype=np.float32) |
|
|
|
|
|
combined = np.concatenate([ |
|
embedding, |
|
metric_values, |
|
turn_info, |
|
padded_probs |
|
]) |
|
|
|
logger.info(f"State vector shape: {combined.shape} (expected: 1040)") |
|
return combined |
|
|
|
def predict_conversion(self, conversation_id: str, history: List[Dict[str, str]], |
|
new_response: str) -> float: |
|
"""Predict conversion probability for a conversation""" |
|
logger.info(f"Predicting conversion for conversation {conversation_id}") |
|
|
|
|
|
normalized_history = self._normalize_history_format(history) |
|
|
|
|
|
updated_history = normalized_history.copy() |
|
updated_history.append({'speaker': 'sales_rep', 'message': new_response}) |
|
|
|
|
|
full_text = " ".join([msg.get('message', '') for msg in updated_history]) |
|
|
|
|
|
embedding = self.get_embedding(full_text) |
|
logger.info(f"Embedding shape: {embedding.shape}") |
|
|
|
|
|
metrics = self.analyze_conversation_metrics(updated_history) |
|
logger.info(f"Metrics: engagement={metrics['customer_engagement']:.2f}, effectiveness={metrics['sales_effectiveness']:.2f}") |
|
|
|
|
|
turn = len(updated_history) // 2 |
|
|
|
|
|
if conversation_id in self.conversation_states: |
|
previous_probs = self.conversation_states[conversation_id]['probabilities'] |
|
else: |
|
previous_probs = [0.5] |
|
|
|
|
|
state_vector = self.create_state_vector(embedding, metrics, turn, previous_probs) |
|
|
|
|
|
if isinstance(state_vector, torch.Tensor): |
|
state_vector = state_vector.cpu().numpy() |
|
|
|
|
|
state_vector = np.array(state_vector, dtype=np.float32) |
|
|
|
|
|
logger.info(f"Final state vector shape: {state_vector.shape}") |
|
|
|
|
|
try: |
|
|
|
action, _ = self.ppo_model.predict(state_vector, deterministic=True) |
|
|
|
|
|
if hasattr(action, 'item'): |
|
predicted_prob = float(action.item()) |
|
elif isinstance(action, np.ndarray): |
|
predicted_prob = float(action[0]) |
|
else: |
|
predicted_prob = float(action) |
|
|
|
|
|
predicted_prob = max(0.0, min(1.0, predicted_prob)) |
|
|
|
except Exception as e: |
|
logger.error(f"Error during prediction: {str(e)}") |
|
|
|
predicted_prob = 0.5 |
|
|
|
|
|
self.conversation_states[conversation_id] = { |
|
'history': updated_history, |
|
'probabilities': previous_probs + [predicted_prob] |
|
} |
|
|
|
logger.info(f"Predicted conversion probability: {predicted_prob:.4f}") |
|
return predicted_prob |
|
|
|
def generate_response(self, conversation_id: str, history: List[Dict[str, str]], |
|
user_input: str, system_prompt: str = None) -> str: |
|
"""Generate a response using llama-cpp and add conversion probability""" |
|
|
|
|
|
normalized_history = self._normalize_history_format(history) |
|
|
|
|
|
messages = [] |
|
|
|
|
|
if system_prompt: |
|
messages.append(f"System: {system_prompt}\n") |
|
else: |
|
messages.append("System: You are a helpful sales assistant.\n") |
|
|
|
|
|
for msg in normalized_history: |
|
speaker = msg.get('speaker', '') |
|
message = msg.get('message', '') |
|
|
|
if speaker == 'user': |
|
messages.append(f"User: {message}\n") |
|
elif speaker == 'sales_rep': |
|
messages.append(f"Assistant: {message}\n") |
|
|
|
|
|
messages.append(f"User: {user_input}\n") |
|
messages.append("Assistant: ") |
|
|
|
|
|
prompt = "".join(messages) |
|
|
|
|
|
llm_response = self.generate_llm_response(prompt, max_new_tokens=2048) |
|
print(llm_response) |
|
|
|
|
|
history_with_user = history.copy() |
|
history_with_user.append({'role': 'user', 'content': user_input}) |
|
|
|
|
|
probability = self.predict_conversion(conversation_id, history_with_user, llm_response) |
|
|
|
|
|
formatted_response = self.format_response_with_probability(llm_response, probability) |
|
|
|
return formatted_response |
|
|
|
def format_response_with_probability(self, response: str, probability: float) -> str: |
|
"""Format response with conversion probability""" |
|
probability_pct = probability * 100 |
|
|
|
if probability >= 0.38: |
|
indicator = "🟢 Conversion Highly Likely" |
|
elif probability >= 0.37: |
|
indicator = "🟡 Good Conversion Potential" |
|
elif probability >= 0.35: |
|
indicator = "🟠 Moderate Conversion Potential" |
|
else: |
|
indicator = "🔴 Conversion Unlikely" |
|
|
|
formatted_response = ( |
|
f"{response}\n\n" |
|
f"---\n" |
|
f"{indicator} ({probability_pct:.1f}%)\n" |
|
) |
|
|
|
return formatted_response |
|
|
|
def format_prediction_result(self, probability: float) -> Dict[str, str]: |
|
"""Format prediction result with status and suggestion""" |
|
probability_pct = probability * 100 |
|
|
|
if probability >= 0.38: |
|
status = "🟢 Conversion Highly Likely" |
|
suggestion = "Follow up with specific next steps or a call to action." |
|
elif probability >= 0.37: |
|
status = "🟡 Good Conversion Potential" |
|
suggestion = "Address any remaining concerns and guide toward a decision." |
|
elif probability >= 0.35: |
|
status = "🟠 Moderate Conversion Potential" |
|
suggestion = "Focus on building value and addressing objections." |
|
else: |
|
status = "🔴 Conversion Unlikely" |
|
suggestion = "Reframe the conversation or qualify needs better." |
|
|
|
return { |
|
"probability": probability, |
|
"formatted_probability": f"{probability_pct:.1f}%", |
|
"status": status, |
|
"suggestion": suggestion |
|
} |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser(description="Sales Conversion Predictor") |
|
parser.add_argument( |
|
"--model_path", |
|
type=str, |
|
default="/content/sales-conversion-model-reinf-learning/sales_conversion_model", |
|
help="Path to the trained PPO model zip file." |
|
) |
|
parser.add_argument( |
|
"--embedding_model_name", |
|
type=str, |
|
default="BAAI/bge-m3", |
|
help="Name of the Hugging Face embedding model (e.g., 'BAAI/bge-m3', 'BAAI/bge-large-en-v1.5')." |
|
) |
|
parser.add_argument( |
|
"--llm_gguf_path", |
|
type=str, |
|
default="unsloth/gemma-3-4b-it-GGUF", |
|
help="Path to the GGUF LLM model file, a local directory containing GGUF files, or a HuggingFace repo_id." |
|
) |
|
parser.add_argument( |
|
"--no_gpu", |
|
action="store_true", |
|
help="Disable GPU usage (use CPU only)." |
|
) |
|
parser.add_argument( |
|
"--n_gpu_layers", |
|
type=int, |
|
default=-1, |
|
help="Number of LLM layers to offload to GPU. -1 for all, 0 for none." |
|
) |
|
parser.add_argument( |
|
"--n_ctx", |
|
type=int, |
|
default=2048, |
|
help="Context window size for the LLM." |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
predictor = SalesConversionPredictor( |
|
model_path=args.model_path, |
|
embedding_model_name=args.embedding_model_name, |
|
llm_gguf_path=args.llm_gguf_path, |
|
use_gpu=not args.no_gpu, |
|
n_gpu_layers=args.n_gpu_layers, |
|
n_ctx=args.n_ctx, |
|
use_mini_embeddings=True |
|
|
|
) |
|
|
|
scenarios = [ |
|
{ |
|
"id": "negative_outcome", |
|
"history": [ |
|
{"role": "user", "content": "I'm looking for a CRM solution for my startup."}, |
|
{"role": "assistant", "content": "I'd be happy to help you find the right CRM solution. What's the size of your team and what are your main requirements?"}, |
|
{"role": "user", "content": "We're a team of 10 and need lead management and email automation."}, |
|
{"role": "assistant", "content": "Our CRM offers excellent lead management and built-in email automation that would be perfect for a team of 10. Let me show you how it works."}, |
|
{"role": "user", "content": "not interested, bye"} |
|
], |
|
"response": "ok, thank you for the interest" |
|
}, |
|
{ |
|
"id": "positive_outcome", |
|
"history": [ |
|
{"role": "user", "content": "I need a project management tool urgently."}, |
|
{"role": "assistant", "content": "I can definitely help you with that! Our tool is designed for quick implementation. What's your main priority?"}, |
|
{"role": "user", "content": "We need to track tasks and deadlines for 20 people."}, |
|
{"role": "assistant", "content": "Perfect! Our solution handles that easily with real-time collaboration features. We can get you set up today with a free trial."}, |
|
{"role": "user", "content": "That sounds great! What's the pricing?"} |
|
], |
|
"response": "For a team of 20, it's $299/month with all features included. You get 14 days free to test everything. Shall I send you the signup link?" |
|
}, |
|
{ |
|
"id": "neutral_outcome", |
|
"history": [ |
|
{"role": "user", "content": "Tell me about your software."}, |
|
{"role": "assistant", "content": "Our software helps businesses manage their operations more efficiently. What specific area are you looking to improve?"}, |
|
{"role": "user", "content": "Just browsing for now."} |
|
], |
|
"response": "No problem! Feel free to explore our website for more information, and I'm here if you have any questions." |
|
} |
|
] |
|
|
|
|
|
for scenario in scenarios: |
|
print(f"\n=== Testing Scenario: {scenario['id']} ===") |
|
|
|
|
|
probability = predictor.predict_conversion( |
|
conversation_id=scenario['id'], |
|
history=scenario['history'], |
|
new_response=scenario['response'] |
|
) |
|
|
|
|
|
result = predictor.format_prediction_result(probability) |
|
|
|
|
|
print(f"Response: {scenario['response']}") |
|
print(f"Probability: {result['formatted_probability']}") |
|
print(f"Status: {result['status']}") |
|
print(f"Suggestion: {result['suggestion']}") |
|
print("-" * 50) |