csc525_retrieval_based_chatbot / response_quality_checker.py
JoeArmani
updates - new iteration with type token
7a0020b
raw
history blame
14 kB
import numpy as np
from typing import List, Tuple, Dict, Any, TYPE_CHECKING
from sklearn.metrics.pairwise import cosine_similarity
from logger_config import config_logger
logger = config_logger(__name__)
if TYPE_CHECKING:
from tf_data_pipeline import TFDataPipeline
class ResponseQualityChecker:
"""
Enhanced quality checking that calculates:
- Relevance between query & responses
- Diversity among top responses
- Response length scoring
- Confidence determination based on multiple thresholds
"""
def __init__(
self,
data_pipeline: 'TFDataPipeline',
confidence_threshold: float = 0.45,
diversity_threshold: float = 0.15,
min_response_length: int = 5,
similarity_cap: float = 0.85,
):
"""
Args:
data_pipeline: Reference to TFDataPipeline for encoding
confidence_threshold: Minimum top_score for a 'confident' result
diversity_threshold: Minimum required diversity among top responses
min_response_length: Minimum words for a decent response
similarity_cap: Cap on pairwise similarity for diversity calc
"""
self.confidence_threshold = confidence_threshold
self.diversity_threshold = diversity_threshold
self.min_response_length = min_response_length
self.similarity_cap = similarity_cap
self.data_pipeline = data_pipeline
# Additional thresholds for more refined checks
self.thresholds = {
'relevance': 0.30, # Slightly relaxed
'length_score': 0.80, # Stricter length requirement
'score_gap': 0.05 # Gap between top scores
}
def check_response_quality(
self,
query: str,
responses: List[Tuple[str, float]]
) -> Dict[str, Any]:
"""
Evaluate the quality of a set of ranked responses for a given query.
Args:
query: The user's original query
responses: List of (response_text, score) sorted by descending score
Returns:
Dictionary of metrics, including 'is_confident' and others
"""
if not responses:
return {
'response_diversity': 0.0,
'query_response_relevance': 0.0,
'is_confident': False,
'top_score': 0.0,
'response_length_score': 0.0,
'top_3_score_gap': 0.0
}
# 1) Calculate relevant metrics
metrics = {}
metrics['response_diversity'] = self.calculate_diversity(responses)
metrics['query_response_relevance'] = self.calculate_relevance(query, responses)
metrics['response_length_score'] = self._average_length_score(responses)
metrics['top_score'] = responses[0][1]
metrics['top_3_score_gap'] = self._calculate_score_gap([s for _, s in responses], top_n=3)
# 2) Determine confidence
metrics['is_confident'] = self._determine_confidence(metrics)
logger.info(f"Quality metrics: {metrics}")
return metrics
def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float:
"""
Compute an overall 'relevance' metric between the query and the top responses.
Uses an exponential transform on the similarity to penalize weaker matches.
"""
if not responses:
return 0.0
# Encode query and responses
query_emb = self.data_pipeline.encode_query(query)
resp_texts = [r for r, _ in responses]
resp_embs = self.data_pipeline.encode_responses(resp_texts)
# Normalize embeddings
query_emb = query_emb / (np.linalg.norm(query_emb) + 1e-12)
resp_norms = np.linalg.norm(resp_embs, axis=1, keepdims=True) + 1e-12
resp_embs = resp_embs / resp_norms
# Cosine similarity
sims = cosine_similarity([query_emb], resp_embs)[0]
# Exponential transform: higher sims remain close to 1, lower sims drop quickly
sims = np.exp(sims - 1.0)
# Weighted average: give heavier weighting to higher-ranked items
weights = np.exp(-np.arange(len(sims)) / 2.0)
weighted_avg = np.average(sims, weights=weights)
return float(weighted_avg)
def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float:
"""
Calculate how 'different' the top responses are from each other.
Diversity = 1 - avg_cosine_similarity (capped).
"""
if len(responses) < 2:
return 1.0 # Single response is trivially 'unique'
resp_texts = [r for r, _ in responses]
embs = self.data_pipeline.encode_responses(resp_texts)
# Pairwise similarity
sim_matrix = cosine_similarity(embs, embs)
np.fill_diagonal(sim_matrix, 0.0)
# Cap similarity to avoid outliers
sim_matrix = np.minimum(sim_matrix, self.similarity_cap)
# Mean off-diagonal similarity
sum_sims = np.sum(sim_matrix)
num_pairs = len(resp_texts) * (len(resp_texts) - 1)
avg_sim = sum_sims / num_pairs if num_pairs > 0 else 0.0
# Invert to get diversity
return 1.0 - avg_sim
def _determine_confidence(self, metrics: Dict[str, float]) -> bool:
"""
Decide if we're 'confident' based on multiple metric thresholds.
"""
primary_conditions = [
metrics['top_score'] >= self.confidence_threshold,
metrics['response_diversity'] >= self.diversity_threshold,
metrics['response_length_score'] >= self.thresholds['length_score']
]
secondary_conditions = [
metrics['query_response_relevance'] >= self.thresholds['relevance'],
metrics['top_3_score_gap'] >= self.thresholds['score_gap'],
metrics['top_score'] >= (self.confidence_threshold + 0.05) # Extra buffer
]
# Must pass all primary checks, and at least 2 of the 3 secondary
return all(primary_conditions) and (sum(secondary_conditions) >= 2)
def _average_length_score(self, responses: List[Tuple[str, float]]) -> float:
"""
Compute an average length score across all responses.
"""
length_scores = []
for response, _ in responses:
length_scores.append(self._length_score(response))
return float(np.mean(length_scores)) if length_scores else 0.0
def _length_score(self, text: str) -> float:
"""
Calculate how well the text meets our length requirement.
Scores 1.0 if text is >= min_response_length and not too long,
else it scales down.
"""
words = len(text.split())
if words < self.min_response_length:
return words / float(self.min_response_length)
elif words > 60:
return max(0.5, 60.0 / words) # Slight penalty for very long
return 1.0
def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float:
"""
Calculate the average gap between consecutive scores in the top N.
"""
if len(scores) < 2:
return 0.0
top_n = min(len(scores), top_n)
gaps = []
for i in range(top_n - 1):
gaps.append(scores[i] - scores[i + 1])
return float(np.mean(gaps)) if gaps else 0.0
# import numpy as np
# from typing import List, Tuple, Dict, Any, TYPE_CHECKING
# from sklearn.metrics.pairwise import cosine_similarity
# from logger_config import config_logger
# logger = config_logger(__name__)
# if TYPE_CHECKING:
# from tf_data_pipeline import TFDataPipeline
# class ResponseQualityChecker:
# """Enhanced quality checking with dynamic thresholds."""
# def __init__(
# self,
# data_pipeline: 'TFDataPipeline',
# confidence_threshold: float = 0.4,
# diversity_threshold: float = 0.15,
# min_response_length: int = 5,
# similarity_cap: float = 0.85 # Renamed from max_similarity_ratio and used in diversity calc
# ):
# self.confidence_threshold = confidence_threshold
# self.diversity_threshold = diversity_threshold
# self.min_response_length = min_response_length
# self.similarity_cap = similarity_cap
# self.data_pipeline = data_pipeline # Reference to TFDataPipeline
# # Dynamic thresholds based on response patterns
# self.thresholds = {
# 'relevance': 0.35,
# 'length_score': 0.85,
# 'score_gap': 0.04
# }
# def check_response_quality(
# self,
# query: str,
# responses: List[Tuple[str, float]]
# ) -> Dict[str, Any]:
# """
# Evaluate the quality of responses based on various metrics.
# Args:
# query: The user's query
# responses: List of (response_text, score) tuples
# Returns:
# Dict containing quality metrics and confidence assessment
# """
# if not responses:
# return {
# 'response_diversity': 0.0,
# 'query_response_relevance': 0.0,
# 'is_confident': False,
# 'top_score': 0.0,
# 'response_length_score': 0.0,
# 'top_3_score_gap': 0.0
# }
# # Calculate core metrics
# metrics = {
# 'response_diversity': self.calculate_diversity(responses),
# 'query_response_relevance': self.calculate_relevance(query, responses),
# 'response_length_score': np.mean([
# self._calculate_length_score(response) for response, _ in responses
# ]),
# 'top_score': responses[0][1],
# 'top_3_score_gap': self._calculate_score_gap([score for _, score in responses], top_n=3)
# }
# # Determine confidence using thresholds
# metrics['is_confident'] = self._determine_confidence(metrics)
# logger.info(f"Quality metrics: {metrics}")
# return metrics
# def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float:
# """Calculate relevance with stricter scoring."""
# if not responses:
# return 0.0
# query_embedding = self.data_pipeline.encode_query(query)
# response_texts = [resp for resp, _ in responses]
# response_embeddings = self.data_pipeline.encode_responses(response_texts)
# # Normalize embeddings
# query_embedding = query_embedding / np.linalg.norm(query_embedding)
# response_embeddings = response_embeddings / np.linalg.norm(response_embeddings, axis=1)[:, np.newaxis]
# # Compute similarities with exponential decay for far matches
# similarities = cosine_similarity([query_embedding], response_embeddings)[0]
# similarities = np.exp(similarities - 1) # Penalize lower similarities more strongly
# # Apply stronger position weighting
# weights = np.exp(-np.arange(len(similarities)) / 2)
# return float(np.average(similarities, weights=weights))
# def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float:
# """Calculate diversity with length normalization and similarity capping."""
# if not responses:
# return 0.0
# response_texts = [resp for resp, _ in responses]
# embeddings = self.data_pipeline.encode_responses(response_texts)
# if len(embeddings) < 2:
# return 1.0
# # Calculate pairwise cosine similarities
# similarity_matrix = cosine_similarity(embeddings)
# np.fill_diagonal(similarity_matrix, 0) # Exclude self-similarity
# # Apply similarity cap
# similarity_matrix = np.minimum(similarity_matrix, self.similarity_cap)
# # Calculate average similarity
# sum_similarities = np.sum(similarity_matrix)
# num_pairs = len(embeddings) * (len(embeddings) - 1)
# avg_similarity = sum_similarities / num_pairs if num_pairs > 0 else 0.0
# # Diversity is inversely related to average similarity
# diversity_score = 1 - avg_similarity
# return diversity_score
# def _determine_confidence(self, metrics: Dict[str, float]) -> bool:
# """Determine confidence using primary and secondary conditions."""
# # Primary conditions (must all be met)
# primary_conditions = [
# metrics['top_score'] >= self.confidence_threshold,
# metrics['response_diversity'] >= self.diversity_threshold,
# metrics['response_length_score'] >= self.thresholds['length_score']
# ]
# # Secondary conditions (majority must be met)
# secondary_conditions = [
# metrics['query_response_relevance'] >= self.thresholds['relevance'],
# metrics['top_3_score_gap'] >= self.thresholds['score_gap'],
# metrics['top_score'] >= (self.confidence_threshold * 1.1) # Extra confidence boost
# ]
# return all(primary_conditions) and sum(secondary_conditions) >= 2
# def _calculate_length_score(self, response: str) -> float:
# """Calculate length score with penalty for very short or long responses."""
# words = len(response.split())
# if words < self.min_response_length:
# return words / self.min_response_length
# elif words > 50: # Penalty for very long responses
# return min(1.0, 50 / words)
# return 1.0
# def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float:
# """Calculate average gap between top N scores."""
# if len(scores) < top_n + 1:
# return 0.0
# gaps = [scores[i] - scores[i + 1] for i in range(min(len(scores) - 1, top_n))]
# return np.mean(gaps)