sales-conversion-model-reinf-learning / opensource_inference.py
DeepMostInnovations's picture
Update opensource_inference.py
c296cbe verified
import argparse
import os
import json
import numpy as np
import torch
from typing import List, Dict
from transformers import (
AutoTokenizer,
AutoModel
)
from stable_baselines3 import PPO
from llama_cpp import Llama
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SalesConversionPredictor:
"""Sales conversion prediction class using Hugging Face models and llama.cpp"""
def __init__(self,
model_path: str,
embedding_model_name: str = "BAAI/bge-large-en-v1.5",
llm_gguf_path: str = "path/to/your/llama-3.2-1b-instruct.gguf",
use_gpu: bool = True,
n_gpu_layers: int = -1, # -1 for all layers on GPU
n_ctx: int = 2048,
use_mini_embeddings: bool = True): # Context window size
"""Initialize with Hugging Face embeddings and llama.cpp LLM"""
# Set device for embeddings
self.device = torch.device("cuda" if torch.cuda.is_available() and use_gpu else "cpu")
logger.info(f"Using device: {self.device}")
# Initialize embedding model (BAAI/bge-large-en-v1.5)
logger.info(f"Loading embedding model: {embedding_model_name}")
self.embedding_tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)
self.embedding_model = AutoModel.from_pretrained(embedding_model_name).to(self.device)
# Check if model was trained with mini embeddings
self.use_mini_embeddings = use_mini_embeddings
self.embedding_dim = 1024 # BGE-large outputs 1024 dimensions
# Initialize LLM model using llama-cpp
logger.info(f"Loading LLM model from GGUF: {llm_gguf_path}")
self.llm = Llama.from_pretrained(
repo_id=llm_gguf_path,
filename="*Q4_K_M.gguf",
n_gpu_layers=n_gpu_layers if use_gpu else 0,
n_ctx=n_ctx,
verbose=False,
use_mlock=True, # Keep model in RAM
n_threads=None # Use all available threads
)
# Load the trained PPO model (force CPU for PPO as recommended)
ppo_device = "cpu"
logger.info(f"Loading PPO model on {ppo_device}")
self.ppo_model = PPO.load(model_path, device=ppo_device)
# Store conversation states
self.conversation_states = {}
def _normalize_history_format(self, history: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Normalize history format to ensure consistency"""
normalized_history = []
for msg in history:
# Extract role/speaker
role = msg.get('role', msg.get('speaker', ''))
# Extract content/message
content = msg.get('content', msg.get('message', ''))
# Map role to expected format for the model
if role in ['user', 'customer']:
speaker = 'user'
elif role in ['assistant', 'sales_rep']:
speaker = 'sales_rep'
else:
speaker = role # Keep as is
normalized_history.append({
'speaker': speaker,
'message': content
})
return normalized_history
def get_embedding(self, text: str) -> np.ndarray:
"""Get embedding for text using BAAI/bge-large-en-v1.5"""
try:
# Tokenize input
inputs = self.embedding_tokenizer(
text,
padding=True,
truncation=True,
return_tensors='pt',
max_length=8192
).to(self.device)
# Get model outputs
with torch.no_grad():
model_output = self.embedding_model(**inputs)
# Get sentence embeddings from the model (mean pooling)
embeddings = model_output.last_hidden_state
attention_mask = inputs['attention_mask']
# Apply mean pooling
input_mask_expanded = attention_mask.unsqueeze(-1).expand(embeddings.size()).float()
sum_embeddings = torch.sum(embeddings * input_mask_expanded, 1)
sum_mask = input_mask_expanded.sum(1)
# Avoid division by zero
sum_mask = torch.clamp(sum_mask, min=1e-9)
mean_embeddings = sum_embeddings / sum_mask
# Normalize embeddings
embeddings = torch.nn.functional.normalize(mean_embeddings, p=2, dim=1)
# Move to CPU and convert to numpy
bge_embedding = embeddings.cpu().numpy()[0].astype(np.float32)
# BGE-large outputs 1024 dimensions by default
logger.info(f"BGE embedding shape: {bge_embedding.shape}")
# Ensure we have exactly 1024 dimensions
if len(bge_embedding) != 1024:
logger.warning(f"Expected 1024 dimensions, got {len(bge_embedding)}")
# Pad or truncate to 1024
if len(bge_embedding) < 1024:
padded = np.zeros(1024, dtype=np.float32)
padded[:len(bge_embedding)] = bge_embedding
bge_embedding = padded
else:
bge_embedding = bge_embedding[:1024]
return bge_embedding
except Exception as e:
logger.error(f"Error getting embedding: {str(e)}")
# Return zeros as fallback with expected dimensions
return np.zeros(1024, dtype=np.float32)
def analyze_conversation_metrics(self, history: List[Dict[str, str]]) -> Dict[str, float]:
"""Analyze conversation to extract key metrics using LLM"""
try:
# Normalize history format first
normalized_history = self._normalize_history_format(history)
# Format conversation for analysis
conversation_text = ""
for msg in normalized_history:
speaker = msg.get('speaker', '')
message = msg.get('message', '')
conversation_text += f"{speaker}: {message}\n\n"
# Create prompt for metrics analysis
prompt = f"""Analyze this sales conversation and rate each metric from 0.0 to 1.0:
customer_engagement:
sales_effectiveness:
Respond only with numbers in the format shown above.
Conversation:
{conversation_text}"""
# Get analysis from LLM
response = self.generate_llm_response(prompt, max_new_tokens=50)
print("response", response)
# Parse metrics
lines = response.strip().split('\n')
print("lines", lines)
engagement = 0.5
effectiveness = 0.5
for line in lines:
if 'customer_engagement' in line.lower():
try:
engagement = float(line.split(':')[-1].strip())
# Ensure it's between 0 and 1
engagement = max(0.0, min(1.0, engagement))
except:
pass
elif 'sales_effectiveness' in line.lower():
try:
effectiveness = float(line.split(':')[-1].strip())
# Ensure it's between 0 and 1
effectiveness = max(0.0, min(1.0, effectiveness))
except:
pass
return {
'customer_engagement': engagement,
'sales_effectiveness': effectiveness,
'conversation_length': len(normalized_history),
'outcome': 0.5, # Unknown at inference time
'progress': min(1.0, len(normalized_history) / 20)
}
except Exception as e:
logger.error(f"Error analyzing conversation: {str(e)}")
# Return default values
return {
'customer_engagement': 0.5,
'sales_effectiveness': 0.5,
'conversation_length': len(history),
'outcome': 0.5,
'progress': min(1.0, len(history) / 20)
}
def generate_llm_response(self, prompt: str, max_new_tokens: int = 2048) -> str:
"""Generate response using llama-cpp"""
try:
# Generate response
response = self.llm(
prompt,
max_tokens=max_new_tokens,
temperature=0.001,
top_p=0.95,
repeat_penalty=1.1,
stop=["User:", "Assistant:", "\n\n"]
)
# Extract generated text
generated_text = response['choices'][0]['text']
# Clean up the response
generated_text = generated_text.strip()
return generated_text
except Exception as e:
logger.error(f"Error generating LLM response: {str(e)}")
return "I apologize, but I encountered an error generating a response."
def create_state_vector(self,
embedding: np.ndarray,
metrics: Dict[str, float],
turn_number: int,
previous_probs: List[float]) -> np.ndarray:
"""Create state vector for model input"""
# Create metric array (ensure all 5 metrics are included)
metric_values = np.array([
metrics['customer_engagement'],
metrics['sales_effectiveness'],
metrics['conversation_length'],
metrics['outcome'],
metrics['progress']
], dtype=np.float32)
# Create turn info
turn_info = np.array([turn_number], dtype=np.float32)
# Pad probability history
padded_probs = np.zeros(10, dtype=np.float32)
if previous_probs:
# Handle the case where previous_probs might have more than 10 elements
recent_probs = previous_probs[-10:] if len(previous_probs) > 10 else previous_probs
padded_probs[:len(recent_probs)] = recent_probs
# Keep original 1024-dimensional embedding without expanding
if len(embedding) != 1024:
logger.warning(f"Unexpected embedding size: {len(embedding)}. Expected 1024. Creating zero embedding.")
embedding = np.zeros(1024, dtype=np.float32)
# Total expected: 1024 + 5 + 1 + 10 = 1040
combined = np.concatenate([
embedding, # 1024 dimensions
metric_values, # 5 dimensions
turn_info, # 1 dimension
padded_probs # 10 dimensions
])
logger.info(f"State vector shape: {combined.shape} (expected: 1040)")
return combined
def predict_conversion(self, conversation_id: str, history: List[Dict[str, str]],
new_response: str) -> float:
"""Predict conversion probability for a conversation"""
logger.info(f"Predicting conversion for conversation {conversation_id}")
# Normalize history format
normalized_history = self._normalize_history_format(history)
# Update history with new response
updated_history = normalized_history.copy()
updated_history.append({'speaker': 'sales_rep', 'message': new_response})
# Get full conversation text for embedding
full_text = " ".join([msg.get('message', '') for msg in updated_history])
# Get embedding (1024 dimensions)
embedding = self.get_embedding(full_text)
logger.info(f"Embedding shape: {embedding.shape}")
# Analyze conversation with updated history
metrics = self.analyze_conversation_metrics(updated_history)
logger.info(f"Metrics: engagement={metrics['customer_engagement']:.2f}, effectiveness={metrics['sales_effectiveness']:.2f}")
# Get turn number (each conversation turn includes user + assistant)
turn = len(updated_history) // 2
# Get previous probabilities
if conversation_id in self.conversation_states:
previous_probs = self.conversation_states[conversation_id]['probabilities']
else:
previous_probs = [0.5] # Initial probability
# Create state vector
state_vector = self.create_state_vector(embedding, metrics, turn, previous_probs)
# Convert to numpy array if it's not already
if isinstance(state_vector, torch.Tensor):
state_vector = state_vector.cpu().numpy()
# Ensure it's a numpy array
state_vector = np.array(state_vector, dtype=np.float32)
# Log the final shape
logger.info(f"Final state vector shape: {state_vector.shape}")
# Predict using PPO model
try:
# Fix deprecation warning by extracting scalar properly
action, _ = self.ppo_model.predict(state_vector, deterministic=True)
# Extract the scalar value
if hasattr(action, 'item'):
predicted_prob = float(action.item())
elif isinstance(action, np.ndarray):
predicted_prob = float(action[0])
else:
predicted_prob = float(action)
# Ensure probability is between 0 and 1
predicted_prob = max(0.0, min(1.0, predicted_prob))
except Exception as e:
logger.error(f"Error during prediction: {str(e)}")
# Fallback prediction
predicted_prob = 0.5
# Update state
self.conversation_states[conversation_id] = {
'history': updated_history,
'probabilities': previous_probs + [predicted_prob]
}
logger.info(f"Predicted conversion probability: {predicted_prob:.4f}")
return predicted_prob
def generate_response(self, conversation_id: str, history: List[Dict[str, str]],
user_input: str, system_prompt: str = None) -> str:
"""Generate a response using llama-cpp and add conversion probability"""
# Normalize history format
normalized_history = self._normalize_history_format(history)
# Format conversation for the LLM
messages = []
# Add system prompt if provided
if system_prompt:
messages.append(f"System: {system_prompt}\n")
else:
messages.append("System: You are a helpful sales assistant.\n")
# Add conversation history
for msg in normalized_history:
speaker = msg.get('speaker', '')
message = msg.get('message', '')
if speaker == 'user':
messages.append(f"User: {message}\n")
elif speaker == 'sales_rep':
messages.append(f"Assistant: {message}\n")
# Add the latest user input
messages.append(f"User: {user_input}\n")
messages.append("Assistant: ")
# Create prompt
prompt = "".join(messages)
# Generate LLM response
llm_response = self.generate_llm_response(prompt, max_new_tokens=2048)
print(llm_response)
# Add user message to history for prediction
history_with_user = history.copy()
history_with_user.append({'role': 'user', 'content': user_input})
# Predict conversion probability
probability = self.predict_conversion(conversation_id, history_with_user, llm_response)
# Format response with probability
formatted_response = self.format_response_with_probability(llm_response, probability)
return formatted_response
def format_response_with_probability(self, response: str, probability: float) -> str:
"""Format response with conversion probability"""
probability_pct = probability * 100
if probability >= 0.38:
indicator = "🟢 Conversion Highly Likely"
elif probability >= 0.37:
indicator = "🟡 Good Conversion Potential"
elif probability >= 0.35:
indicator = "🟠 Moderate Conversion Potential"
else:
indicator = "🔴 Conversion Unlikely"
formatted_response = (
f"{response}\n\n"
f"---\n"
f"{indicator} ({probability_pct:.1f}%)\n"
)
return formatted_response
def format_prediction_result(self, probability: float) -> Dict[str, str]:
"""Format prediction result with status and suggestion"""
probability_pct = probability * 100
if probability >= 0.38:
status = "🟢 Conversion Highly Likely"
suggestion = "Follow up with specific next steps or a call to action."
elif probability >= 0.37:
status = "🟡 Good Conversion Potential"
suggestion = "Address any remaining concerns and guide toward a decision."
elif probability >= 0.35:
status = "🟠 Moderate Conversion Potential"
suggestion = "Focus on building value and addressing objections."
else:
status = "🔴 Conversion Unlikely"
suggestion = "Reframe the conversation or qualify needs better."
return {
"probability": probability,
"formatted_probability": f"{probability_pct:.1f}%",
"status": status,
"suggestion": suggestion
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Sales Conversion Predictor")
parser.add_argument(
"--model_path",
type=str,
default="/content/sales-conversion-model-reinf-learning/sales_conversion_model",
help="Path to the trained PPO model zip file."
)
parser.add_argument(
"--embedding_model_name",
type=str,
default="BAAI/bge-m3", # Defaulting to bge-m3 as per example
help="Name of the Hugging Face embedding model (e.g., 'BAAI/bge-m3', 'BAAI/bge-large-en-v1.5')."
)
parser.add_argument(
"--llm_gguf_path",
type=str,
default="unsloth/gemma-3-4b-it-GGUF", # Defaulting to a repo ID as per example
help="Path to the GGUF LLM model file, a local directory containing GGUF files, or a HuggingFace repo_id."
)
parser.add_argument(
"--no_gpu",
action="store_true",
help="Disable GPU usage (use CPU only)."
)
parser.add_argument(
"--n_gpu_layers",
type=int,
default=-1, # Default to all layers on GPU for llama.cpp
help="Number of LLM layers to offload to GPU. -1 for all, 0 for none."
)
parser.add_argument(
"--n_ctx",
type=int,
default=2048,
help="Context window size for the LLM."
)
args = parser.parse_args()
# Initialize predictor with GGUF model
predictor = SalesConversionPredictor(
model_path=args.model_path,
embedding_model_name=args.embedding_model_name,
llm_gguf_path=args.llm_gguf_path,
use_gpu=not args.no_gpu,
n_gpu_layers=args.n_gpu_layers,
n_ctx=args.n_ctx,
use_mini_embeddings=True # Kept from original, PPO model should match this if it affects state vector.
# Currently, embedding dim is fixed at 1024 in code.
)
# Test with different conversation scenarios
scenarios = [
{
"id": "negative_outcome",
"history": [
{"role": "user", "content": "I'm looking for a CRM solution for my startup."},
{"role": "assistant", "content": "I'd be happy to help you find the right CRM solution. What's the size of your team and what are your main requirements?"},
{"role": "user", "content": "We're a team of 10 and need lead management and email automation."},
{"role": "assistant", "content": "Our CRM offers excellent lead management and built-in email automation that would be perfect for a team of 10. Let me show you how it works."},
{"role": "user", "content": "not interested, bye"}
],
"response": "ok, thank you for the interest"
},
{
"id": "positive_outcome",
"history": [
{"role": "user", "content": "I need a project management tool urgently."},
{"role": "assistant", "content": "I can definitely help you with that! Our tool is designed for quick implementation. What's your main priority?"},
{"role": "user", "content": "We need to track tasks and deadlines for 20 people."},
{"role": "assistant", "content": "Perfect! Our solution handles that easily with real-time collaboration features. We can get you set up today with a free trial."},
{"role": "user", "content": "That sounds great! What's the pricing?"}
],
"response": "For a team of 20, it's $299/month with all features included. You get 14 days free to test everything. Shall I send you the signup link?"
},
{
"id": "neutral_outcome",
"history": [
{"role": "user", "content": "Tell me about your software."},
{"role": "assistant", "content": "Our software helps businesses manage their operations more efficiently. What specific area are you looking to improve?"},
{"role": "user", "content": "Just browsing for now."}
],
"response": "No problem! Feel free to explore our website for more information, and I'm here if you have any questions."
}
]
# Test each scenario
for scenario in scenarios:
print(f"\n=== Testing Scenario: {scenario['id']} ===")
# Predict conversion probability
probability = predictor.predict_conversion(
conversation_id=scenario['id'],
history=scenario['history'],
new_response=scenario['response']
)
# Get formatted result
result = predictor.format_prediction_result(probability)
# Print results
print(f"Response: {scenario['response']}")
print(f"Probability: {result['formatted_probability']}")
print(f"Status: {result['status']}")
print(f"Suggestion: {result['suggestion']}")
print("-" * 50)