import numpy as np | |
from typing import List, Tuple, Dict, Any, TYPE_CHECKING | |
from sklearn.metrics.pairwise import cosine_similarity | |
from logger_config import config_logger | |
logger = config_logger(__name__) | |
if TYPE_CHECKING: | |
from tf_data_pipeline import TFDataPipeline | |
class ResponseQualityChecker: | |
""" | |
Enhanced quality checking that calculates: | |
- Relevance between query & responses | |
- Diversity among top responses | |
- Response length scoring | |
- Confidence determination based on multiple thresholds | |
""" | |
def __init__( | |
self, | |
data_pipeline: 'TFDataPipeline', | |
confidence_threshold: float = 0.45, | |
diversity_threshold: float = 0.15, | |
min_response_length: int = 5, | |
similarity_cap: float = 0.85, | |
): | |
""" | |
Args: | |
data_pipeline: Reference to TFDataPipeline for encoding | |
confidence_threshold: Minimum top_score for a 'confident' result | |
diversity_threshold: Minimum required diversity among top responses | |
min_response_length: Minimum words for a decent response | |
similarity_cap: Cap on pairwise similarity for diversity calc | |
""" | |
self.confidence_threshold = confidence_threshold | |
self.diversity_threshold = diversity_threshold | |
self.min_response_length = min_response_length | |
self.similarity_cap = similarity_cap | |
self.data_pipeline = data_pipeline | |
# Additional thresholds for more refined checks | |
self.thresholds = { | |
'relevance': 0.30, # Slightly relaxed | |
'length_score': 0.80, # Stricter length requirement | |
'score_gap': 0.05 # Gap between top scores | |
} | |
def check_response_quality( | |
self, | |
query: str, | |
responses: List[Tuple[str, float]] | |
) -> Dict[str, Any]: | |
""" | |
Evaluate the quality of a set of ranked responses for a given query. | |
Args: | |
query: The user's original query | |
responses: List of (response_text, score) sorted by descending score | |
Returns: | |
Dictionary of metrics, including 'is_confident' and others | |
""" | |
if not responses: | |
return { | |
'response_diversity': 0.0, | |
'query_response_relevance': 0.0, | |
'is_confident': False, | |
'top_score': 0.0, | |
'response_length_score': 0.0, | |
'top_3_score_gap': 0.0 | |
} | |
# 1) Calculate relevant metrics | |
metrics = {} | |
metrics['response_diversity'] = self.calculate_diversity(responses) | |
metrics['query_response_relevance'] = self.calculate_relevance(query, responses) | |
metrics['response_length_score'] = self._average_length_score(responses) | |
metrics['top_score'] = responses[0][1] | |
metrics['top_3_score_gap'] = self._calculate_score_gap([s for _, s in responses], top_n=3) | |
# 2) Determine confidence | |
metrics['is_confident'] = self._determine_confidence(metrics) | |
logger.info(f"Quality metrics: {metrics}") | |
return metrics | |
def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float: | |
""" | |
Compute an overall 'relevance' metric between the query and the top responses. | |
Uses an exponential transform on the similarity to penalize weaker matches. | |
""" | |
if not responses: | |
return 0.0 | |
# Encode query and responses | |
query_emb = self.data_pipeline.encode_query(query) | |
resp_texts = [r for r, _ in responses] | |
resp_embs = self.data_pipeline.encode_responses(resp_texts) | |
# Normalize embeddings | |
query_emb = query_emb / (np.linalg.norm(query_emb) + 1e-12) | |
resp_norms = np.linalg.norm(resp_embs, axis=1, keepdims=True) + 1e-12 | |
resp_embs = resp_embs / resp_norms | |
# Cosine similarity | |
sims = cosine_similarity([query_emb], resp_embs)[0] | |
# Exponential transform: higher sims remain close to 1, lower sims drop quickly | |
sims = np.exp(sims - 1.0) | |
# Weighted average: give heavier weighting to higher-ranked items | |
weights = np.exp(-np.arange(len(sims)) / 2.0) | |
weighted_avg = np.average(sims, weights=weights) | |
return float(weighted_avg) | |
def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float: | |
""" | |
Calculate how 'different' the top responses are from each other. | |
Diversity = 1 - avg_cosine_similarity (capped). | |
""" | |
if len(responses) < 2: | |
return 1.0 # Single response is trivially 'unique' | |
resp_texts = [r for r, _ in responses] | |
embs = self.data_pipeline.encode_responses(resp_texts) | |
# Pairwise similarity | |
sim_matrix = cosine_similarity(embs, embs) | |
np.fill_diagonal(sim_matrix, 0.0) | |
# Cap similarity to avoid outliers | |
sim_matrix = np.minimum(sim_matrix, self.similarity_cap) | |
# Mean off-diagonal similarity | |
sum_sims = np.sum(sim_matrix) | |
num_pairs = len(resp_texts) * (len(resp_texts) - 1) | |
avg_sim = sum_sims / num_pairs if num_pairs > 0 else 0.0 | |
# Invert to get diversity | |
return 1.0 - avg_sim | |
def _determine_confidence(self, metrics: Dict[str, float]) -> bool: | |
""" | |
Decide if we're 'confident' based on multiple metric thresholds. | |
""" | |
primary_conditions = [ | |
metrics['top_score'] >= self.confidence_threshold, | |
metrics['response_diversity'] >= self.diversity_threshold, | |
metrics['response_length_score'] >= self.thresholds['length_score'] | |
] | |
secondary_conditions = [ | |
metrics['query_response_relevance'] >= self.thresholds['relevance'], | |
metrics['top_3_score_gap'] >= self.thresholds['score_gap'], | |
metrics['top_score'] >= (self.confidence_threshold + 0.05) # Extra buffer | |
] | |
# Must pass all primary checks, and at least 2 of the 3 secondary | |
return all(primary_conditions) and (sum(secondary_conditions) >= 2) | |
def _average_length_score(self, responses: List[Tuple[str, float]]) -> float: | |
""" | |
Compute an average length score across all responses. | |
""" | |
length_scores = [] | |
for response, _ in responses: | |
length_scores.append(self._length_score(response)) | |
return float(np.mean(length_scores)) if length_scores else 0.0 | |
def _length_score(self, text: str) -> float: | |
""" | |
Calculate how well the text meets our length requirement. | |
Scores 1.0 if text is >= min_response_length and not too long, | |
else it scales down. | |
""" | |
words = len(text.split()) | |
if words < self.min_response_length: | |
return words / float(self.min_response_length) | |
elif words > 60: | |
return max(0.5, 60.0 / words) # Slight penalty for very long | |
return 1.0 | |
def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float: | |
""" | |
Calculate the average gap between consecutive scores in the top N. | |
""" | |
if len(scores) < 2: | |
return 0.0 | |
top_n = min(len(scores), top_n) | |
gaps = [] | |
for i in range(top_n - 1): | |
gaps.append(scores[i] - scores[i + 1]) | |
return float(np.mean(gaps)) if gaps else 0.0 | |
# import numpy as np | |
# from typing import List, Tuple, Dict, Any, TYPE_CHECKING | |
# from sklearn.metrics.pairwise import cosine_similarity | |
# from logger_config import config_logger | |
# logger = config_logger(__name__) | |
# if TYPE_CHECKING: | |
# from tf_data_pipeline import TFDataPipeline | |
# class ResponseQualityChecker: | |
# """Enhanced quality checking with dynamic thresholds.""" | |
# def __init__( | |
# self, | |
# data_pipeline: 'TFDataPipeline', | |
# confidence_threshold: float = 0.4, | |
# diversity_threshold: float = 0.15, | |
# min_response_length: int = 5, | |
# similarity_cap: float = 0.85 # Renamed from max_similarity_ratio and used in diversity calc | |
# ): | |
# self.confidence_threshold = confidence_threshold | |
# self.diversity_threshold = diversity_threshold | |
# self.min_response_length = min_response_length | |
# self.similarity_cap = similarity_cap | |
# self.data_pipeline = data_pipeline # Reference to TFDataPipeline | |
# # Dynamic thresholds based on response patterns | |
# self.thresholds = { | |
# 'relevance': 0.35, | |
# 'length_score': 0.85, | |
# 'score_gap': 0.04 | |
# } | |
# def check_response_quality( | |
# self, | |
# query: str, | |
# responses: List[Tuple[str, float]] | |
# ) -> Dict[str, Any]: | |
# """ | |
# Evaluate the quality of responses based on various metrics. | |
# Args: | |
# query: The user's query | |
# responses: List of (response_text, score) tuples | |
# Returns: | |
# Dict containing quality metrics and confidence assessment | |
# """ | |
# if not responses: | |
# return { | |
# 'response_diversity': 0.0, | |
# 'query_response_relevance': 0.0, | |
# 'is_confident': False, | |
# 'top_score': 0.0, | |
# 'response_length_score': 0.0, | |
# 'top_3_score_gap': 0.0 | |
# } | |
# # Calculate core metrics | |
# metrics = { | |
# 'response_diversity': self.calculate_diversity(responses), | |
# 'query_response_relevance': self.calculate_relevance(query, responses), | |
# 'response_length_score': np.mean([ | |
# self._calculate_length_score(response) for response, _ in responses | |
# ]), | |
# 'top_score': responses[0][1], | |
# 'top_3_score_gap': self._calculate_score_gap([score for _, score in responses], top_n=3) | |
# } | |
# # Determine confidence using thresholds | |
# metrics['is_confident'] = self._determine_confidence(metrics) | |
# logger.info(f"Quality metrics: {metrics}") | |
# return metrics | |
# def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float: | |
# """Calculate relevance with stricter scoring.""" | |
# if not responses: | |
# return 0.0 | |
# query_embedding = self.data_pipeline.encode_query(query) | |
# response_texts = [resp for resp, _ in responses] | |
# response_embeddings = self.data_pipeline.encode_responses(response_texts) | |
# # Normalize embeddings | |
# query_embedding = query_embedding / np.linalg.norm(query_embedding) | |
# response_embeddings = response_embeddings / np.linalg.norm(response_embeddings, axis=1)[:, np.newaxis] | |
# # Compute similarities with exponential decay for far matches | |
# similarities = cosine_similarity([query_embedding], response_embeddings)[0] | |
# similarities = np.exp(similarities - 1) # Penalize lower similarities more strongly | |
# # Apply stronger position weighting | |
# weights = np.exp(-np.arange(len(similarities)) / 2) | |
# return float(np.average(similarities, weights=weights)) | |
# def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float: | |
# """Calculate diversity with length normalization and similarity capping.""" | |
# if not responses: | |
# return 0.0 | |
# response_texts = [resp for resp, _ in responses] | |
# embeddings = self.data_pipeline.encode_responses(response_texts) | |
# if len(embeddings) < 2: | |
# return 1.0 | |
# # Calculate pairwise cosine similarities | |
# similarity_matrix = cosine_similarity(embeddings) | |
# np.fill_diagonal(similarity_matrix, 0) # Exclude self-similarity | |
# # Apply similarity cap | |
# similarity_matrix = np.minimum(similarity_matrix, self.similarity_cap) | |
# # Calculate average similarity | |
# sum_similarities = np.sum(similarity_matrix) | |
# num_pairs = len(embeddings) * (len(embeddings) - 1) | |
# avg_similarity = sum_similarities / num_pairs if num_pairs > 0 else 0.0 | |
# # Diversity is inversely related to average similarity | |
# diversity_score = 1 - avg_similarity | |
# return diversity_score | |
# def _determine_confidence(self, metrics: Dict[str, float]) -> bool: | |
# """Determine confidence using primary and secondary conditions.""" | |
# # Primary conditions (must all be met) | |
# primary_conditions = [ | |
# metrics['top_score'] >= self.confidence_threshold, | |
# metrics['response_diversity'] >= self.diversity_threshold, | |
# metrics['response_length_score'] >= self.thresholds['length_score'] | |
# ] | |
# # Secondary conditions (majority must be met) | |
# secondary_conditions = [ | |
# metrics['query_response_relevance'] >= self.thresholds['relevance'], | |
# metrics['top_3_score_gap'] >= self.thresholds['score_gap'], | |
# metrics['top_score'] >= (self.confidence_threshold * 1.1) # Extra confidence boost | |
# ] | |
# return all(primary_conditions) and sum(secondary_conditions) >= 2 | |
# def _calculate_length_score(self, response: str) -> float: | |
# """Calculate length score with penalty for very short or long responses.""" | |
# words = len(response.split()) | |
# if words < self.min_response_length: | |
# return words / self.min_response_length | |
# elif words > 50: # Penalty for very long responses | |
# return min(1.0, 50 / words) | |
# return 1.0 | |
# def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float: | |
# """Calculate average gap between top N scores.""" | |
# if len(scores) < top_n + 1: | |
# return 0.0 | |
# gaps = [scores[i] - scores[i + 1] for i in range(min(len(scores) - 1, top_n))] | |
# return np.mean(gaps) |