csc525_retrieval_based_chatbot / response_quality_checker.py
JoeArmani
summarization, reranker, environment setup, and response quality checker
f7b283c
raw
history blame
6.98 kB
import numpy as np
from typing import List, Tuple, Dict, Any, TYPE_CHECKING
from sklearn.metrics.pairwise import cosine_similarity
from logger_config import config_logger
logger = config_logger(__name__)
if TYPE_CHECKING:
from chatbot_model import RetrievalChatbot
class ResponseQualityChecker:
"""Enhanced quality checking with dynamic thresholds."""
def __init__(
self,
chatbot: 'RetrievalChatbot',
confidence_threshold: float = 0.6,
diversity_threshold: float = 0.15,
min_response_length: int = 5,
similarity_cap: float = 0.85 # Renamed from max_similarity_ratio and used in diversity calc
):
self.confidence_threshold = confidence_threshold
self.diversity_threshold = diversity_threshold
self.min_response_length = min_response_length
self.similarity_cap = similarity_cap
self.chatbot = chatbot
# Dynamic thresholds based on response patterns
self.thresholds = {
'relevance': 0.35,
'length_score': 0.85,
'score_gap': 0.07
}
def check_response_quality(
self,
query: str,
responses: List[Tuple[str, float]]
) -> Dict[str, Any]:
"""
Evaluate the quality of responses based on various metrics.
Args:
query: The user's query
responses: List of (response_text, score) tuples
Returns:
Dict containing quality metrics and confidence assessment
"""
if not responses:
return {
'response_diversity': 0.0,
'query_response_relevance': 0.0,
'is_confident': False,
'top_score': 0.0,
'response_length_score': 0.0,
'top_3_score_gap': 0.0
}
# Calculate core metrics
metrics = {
'response_diversity': self.calculate_diversity(responses),
'query_response_relevance': self.calculate_relevance(query, responses),
'response_length_score': np.mean([
self._calculate_length_score(response) for response, _ in responses
]),
'top_score': responses[0][1],
'top_3_score_gap': self._calculate_score_gap([score for _, score in responses], top_n=3)
}
# Determine confidence using thresholds
metrics['is_confident'] = self._determine_confidence(metrics)
logger.info(f"Quality metrics: {metrics}")
return metrics
def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float:
"""Calculate relevance as weighted similarity between query and responses."""
if not responses:
return 0.0
# Get embeddings
query_embedding = self.encode_query(query)
response_embeddings = [self.encode_text(response) for response, _ in responses]
# Compute similarities with decreasing weights for later responses
similarities = cosine_similarity([query_embedding], response_embeddings)[0]
weights = np.array([1.0 / (i + 1) for i in range(len(similarities))])
return np.average(similarities, weights=weights)
def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float:
"""Calculate diversity with length normalization and similarity capping."""
if not responses:
return 0.0
embeddings = [self.encode_text(response) for response, _ in responses]
if len(embeddings) < 2:
return 1.0
# Calculate similarities and apply cap
similarity_matrix = cosine_similarity(embeddings)
similarity_matrix = np.minimum(similarity_matrix, self.similarity_cap)
# Apply length normalization
lengths = [len(resp[0].split()) for resp in responses]
length_ratios = np.array([min(a, b) / max(a, b) for a in lengths for b in lengths])
length_ratios = length_ratios.reshape(len(responses), len(responses))
# Combine factors with weights
adjusted_similarity = (similarity_matrix * 0.7 + length_ratios * 0.3)
# Calculate final score
sum_similarities = np.sum(adjusted_similarity) - len(responses)
num_pairs = len(responses) * (len(responses) - 1)
avg_similarity = sum_similarities / num_pairs if num_pairs > 0 else 0.0
return 1 - avg_similarity
def _determine_confidence(self, metrics: Dict[str, float]) -> bool:
"""Determine confidence using primary and secondary conditions."""
# Primary conditions (must all be met)
primary_conditions = [
metrics['top_score'] >= self.confidence_threshold,
metrics['response_diversity'] >= self.diversity_threshold,
metrics['response_length_score'] >= self.thresholds['length_score']
]
# Secondary conditions (majority must be met)
secondary_conditions = [
metrics['query_response_relevance'] >= self.thresholds['relevance'],
metrics['top_3_score_gap'] >= self.thresholds['score_gap'],
metrics['top_score'] >= (self.confidence_threshold * 1.1) # Extra confidence boost
]
return all(primary_conditions) and sum(secondary_conditions) >= 2
def _calculate_length_score(self, response: str) -> float:
"""Calculate length score with penalty for very short or long responses."""
words = len(response.split())
if words < self.min_response_length:
return words / self.min_response_length
elif words > 50: # Penalty for very long responses
return min(1.0, 50 / words)
return 1.0
def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float:
"""Calculate average gap between top N scores."""
if len(scores) < top_n + 1:
return 0.0
gaps = [scores[i] - scores[i + 1] for i in range(min(len(scores) - 1, top_n))]
return np.mean(gaps)
def encode_text(self, text: str) -> np.ndarray:
"""Encode response text to embedding."""
embedding_tensor = self.chatbot.encode_responses([text])
embedding = embedding_tensor.numpy()[0].astype('float32')
return self._normalize_embedding(embedding)
def encode_query(self, query: str) -> np.ndarray:
"""Encode query text to embedding."""
embedding_tensor = self.chatbot.encode_query(query)
embedding = embedding_tensor.numpy()[0].astype('float32')
return self._normalize_embedding(embedding)
def _normalize_embedding(self, embedding: np.ndarray) -> np.ndarray:
"""Normalize embedding vector."""
norm = np.linalg.norm(embedding)
return embedding / norm if norm > 0 else embedding