File size: 13,992 Bytes
300fe5d f7b283c 300fe5d f7b283c 5b413d1 300fe5d 7a0020b 5b413d1 300fe5d 5b413d1 7a0020b f7b283c 7a0020b 300fe5d 7a0020b 300fe5d f7b283c 7a0020b 5b413d1 7a0020b f7b283c 7a0020b f7b283c 5b413d1 300fe5d 7a0020b 5b413d1 f7b283c 7a0020b 5b413d1 f7b283c 7a0020b f7b283c 5b413d1 7a0020b 5b413d1 7a0020b f7b283c 7a0020b f7b283c 5b413d1 7a0020b 5b413d1 7a0020b 5b413d1 7a0020b 5b413d1 7a0020b f7b283c 300fe5d 7a0020b 5b413d1 7a0020b 5b413d1 7a0020b 5b413d1 7a0020b 5b413d1 7a0020b 5b413d1 7a0020b f7b283c 7a0020b f7b283c 5b413d1 f7b283c 7a0020b f7b283c 5b413d1 7a0020b f7b283c 7a0020b 5b413d1 7a0020b f7b283c 7a0020b 300fe5d f7b283c 300fe5d 7a0020b 300fe5d 7a0020b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
import numpy as np
from typing import List, Tuple, Dict, Any, TYPE_CHECKING
from sklearn.metrics.pairwise import cosine_similarity
from logger_config import config_logger
logger = config_logger(__name__)
if TYPE_CHECKING:
from tf_data_pipeline import TFDataPipeline
class ResponseQualityChecker:
"""
Enhanced quality checking that calculates:
- Relevance between query & responses
- Diversity among top responses
- Response length scoring
- Confidence determination based on multiple thresholds
"""
def __init__(
self,
data_pipeline: 'TFDataPipeline',
confidence_threshold: float = 0.45,
diversity_threshold: float = 0.15,
min_response_length: int = 5,
similarity_cap: float = 0.85,
):
"""
Args:
data_pipeline: Reference to TFDataPipeline for encoding
confidence_threshold: Minimum top_score for a 'confident' result
diversity_threshold: Minimum required diversity among top responses
min_response_length: Minimum words for a decent response
similarity_cap: Cap on pairwise similarity for diversity calc
"""
self.confidence_threshold = confidence_threshold
self.diversity_threshold = diversity_threshold
self.min_response_length = min_response_length
self.similarity_cap = similarity_cap
self.data_pipeline = data_pipeline
# Additional thresholds for more refined checks
self.thresholds = {
'relevance': 0.30, # Slightly relaxed
'length_score': 0.80, # Stricter length requirement
'score_gap': 0.05 # Gap between top scores
}
def check_response_quality(
self,
query: str,
responses: List[Tuple[str, float]]
) -> Dict[str, Any]:
"""
Evaluate the quality of a set of ranked responses for a given query.
Args:
query: The user's original query
responses: List of (response_text, score) sorted by descending score
Returns:
Dictionary of metrics, including 'is_confident' and others
"""
if not responses:
return {
'response_diversity': 0.0,
'query_response_relevance': 0.0,
'is_confident': False,
'top_score': 0.0,
'response_length_score': 0.0,
'top_3_score_gap': 0.0
}
# 1) Calculate relevant metrics
metrics = {}
metrics['response_diversity'] = self.calculate_diversity(responses)
metrics['query_response_relevance'] = self.calculate_relevance(query, responses)
metrics['response_length_score'] = self._average_length_score(responses)
metrics['top_score'] = responses[0][1]
metrics['top_3_score_gap'] = self._calculate_score_gap([s for _, s in responses], top_n=3)
# 2) Determine confidence
metrics['is_confident'] = self._determine_confidence(metrics)
logger.info(f"Quality metrics: {metrics}")
return metrics
def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float:
"""
Compute an overall 'relevance' metric between the query and the top responses.
Uses an exponential transform on the similarity to penalize weaker matches.
"""
if not responses:
return 0.0
# Encode query and responses
query_emb = self.data_pipeline.encode_query(query)
resp_texts = [r for r, _ in responses]
resp_embs = self.data_pipeline.encode_responses(resp_texts)
# Normalize embeddings
query_emb = query_emb / (np.linalg.norm(query_emb) + 1e-12)
resp_norms = np.linalg.norm(resp_embs, axis=1, keepdims=True) + 1e-12
resp_embs = resp_embs / resp_norms
# Cosine similarity
sims = cosine_similarity([query_emb], resp_embs)[0]
# Exponential transform: higher sims remain close to 1, lower sims drop quickly
sims = np.exp(sims - 1.0)
# Weighted average: give heavier weighting to higher-ranked items
weights = np.exp(-np.arange(len(sims)) / 2.0)
weighted_avg = np.average(sims, weights=weights)
return float(weighted_avg)
def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float:
"""
Calculate how 'different' the top responses are from each other.
Diversity = 1 - avg_cosine_similarity (capped).
"""
if len(responses) < 2:
return 1.0 # Single response is trivially 'unique'
resp_texts = [r for r, _ in responses]
embs = self.data_pipeline.encode_responses(resp_texts)
# Pairwise similarity
sim_matrix = cosine_similarity(embs, embs)
np.fill_diagonal(sim_matrix, 0.0)
# Cap similarity to avoid outliers
sim_matrix = np.minimum(sim_matrix, self.similarity_cap)
# Mean off-diagonal similarity
sum_sims = np.sum(sim_matrix)
num_pairs = len(resp_texts) * (len(resp_texts) - 1)
avg_sim = sum_sims / num_pairs if num_pairs > 0 else 0.0
# Invert to get diversity
return 1.0 - avg_sim
def _determine_confidence(self, metrics: Dict[str, float]) -> bool:
"""
Decide if we're 'confident' based on multiple metric thresholds.
"""
primary_conditions = [
metrics['top_score'] >= self.confidence_threshold,
metrics['response_diversity'] >= self.diversity_threshold,
metrics['response_length_score'] >= self.thresholds['length_score']
]
secondary_conditions = [
metrics['query_response_relevance'] >= self.thresholds['relevance'],
metrics['top_3_score_gap'] >= self.thresholds['score_gap'],
metrics['top_score'] >= (self.confidence_threshold + 0.05) # Extra buffer
]
# Must pass all primary checks, and at least 2 of the 3 secondary
return all(primary_conditions) and (sum(secondary_conditions) >= 2)
def _average_length_score(self, responses: List[Tuple[str, float]]) -> float:
"""
Compute an average length score across all responses.
"""
length_scores = []
for response, _ in responses:
length_scores.append(self._length_score(response))
return float(np.mean(length_scores)) if length_scores else 0.0
def _length_score(self, text: str) -> float:
"""
Calculate how well the text meets our length requirement.
Scores 1.0 if text is >= min_response_length and not too long,
else it scales down.
"""
words = len(text.split())
if words < self.min_response_length:
return words / float(self.min_response_length)
elif words > 60:
return max(0.5, 60.0 / words) # Slight penalty for very long
return 1.0
def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float:
"""
Calculate the average gap between consecutive scores in the top N.
"""
if len(scores) < 2:
return 0.0
top_n = min(len(scores), top_n)
gaps = []
for i in range(top_n - 1):
gaps.append(scores[i] - scores[i + 1])
return float(np.mean(gaps)) if gaps else 0.0
# import numpy as np
# from typing import List, Tuple, Dict, Any, TYPE_CHECKING
# from sklearn.metrics.pairwise import cosine_similarity
# from logger_config import config_logger
# logger = config_logger(__name__)
# if TYPE_CHECKING:
# from tf_data_pipeline import TFDataPipeline
# class ResponseQualityChecker:
# """Enhanced quality checking with dynamic thresholds."""
# def __init__(
# self,
# data_pipeline: 'TFDataPipeline',
# confidence_threshold: float = 0.4,
# diversity_threshold: float = 0.15,
# min_response_length: int = 5,
# similarity_cap: float = 0.85 # Renamed from max_similarity_ratio and used in diversity calc
# ):
# self.confidence_threshold = confidence_threshold
# self.diversity_threshold = diversity_threshold
# self.min_response_length = min_response_length
# self.similarity_cap = similarity_cap
# self.data_pipeline = data_pipeline # Reference to TFDataPipeline
# # Dynamic thresholds based on response patterns
# self.thresholds = {
# 'relevance': 0.35,
# 'length_score': 0.85,
# 'score_gap': 0.04
# }
# def check_response_quality(
# self,
# query: str,
# responses: List[Tuple[str, float]]
# ) -> Dict[str, Any]:
# """
# Evaluate the quality of responses based on various metrics.
# Args:
# query: The user's query
# responses: List of (response_text, score) tuples
# Returns:
# Dict containing quality metrics and confidence assessment
# """
# if not responses:
# return {
# 'response_diversity': 0.0,
# 'query_response_relevance': 0.0,
# 'is_confident': False,
# 'top_score': 0.0,
# 'response_length_score': 0.0,
# 'top_3_score_gap': 0.0
# }
# # Calculate core metrics
# metrics = {
# 'response_diversity': self.calculate_diversity(responses),
# 'query_response_relevance': self.calculate_relevance(query, responses),
# 'response_length_score': np.mean([
# self._calculate_length_score(response) for response, _ in responses
# ]),
# 'top_score': responses[0][1],
# 'top_3_score_gap': self._calculate_score_gap([score for _, score in responses], top_n=3)
# }
# # Determine confidence using thresholds
# metrics['is_confident'] = self._determine_confidence(metrics)
# logger.info(f"Quality metrics: {metrics}")
# return metrics
# def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float:
# """Calculate relevance with stricter scoring."""
# if not responses:
# return 0.0
# query_embedding = self.data_pipeline.encode_query(query)
# response_texts = [resp for resp, _ in responses]
# response_embeddings = self.data_pipeline.encode_responses(response_texts)
# # Normalize embeddings
# query_embedding = query_embedding / np.linalg.norm(query_embedding)
# response_embeddings = response_embeddings / np.linalg.norm(response_embeddings, axis=1)[:, np.newaxis]
# # Compute similarities with exponential decay for far matches
# similarities = cosine_similarity([query_embedding], response_embeddings)[0]
# similarities = np.exp(similarities - 1) # Penalize lower similarities more strongly
# # Apply stronger position weighting
# weights = np.exp(-np.arange(len(similarities)) / 2)
# return float(np.average(similarities, weights=weights))
# def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float:
# """Calculate diversity with length normalization and similarity capping."""
# if not responses:
# return 0.0
# response_texts = [resp for resp, _ in responses]
# embeddings = self.data_pipeline.encode_responses(response_texts)
# if len(embeddings) < 2:
# return 1.0
# # Calculate pairwise cosine similarities
# similarity_matrix = cosine_similarity(embeddings)
# np.fill_diagonal(similarity_matrix, 0) # Exclude self-similarity
# # Apply similarity cap
# similarity_matrix = np.minimum(similarity_matrix, self.similarity_cap)
# # Calculate average similarity
# sum_similarities = np.sum(similarity_matrix)
# num_pairs = len(embeddings) * (len(embeddings) - 1)
# avg_similarity = sum_similarities / num_pairs if num_pairs > 0 else 0.0
# # Diversity is inversely related to average similarity
# diversity_score = 1 - avg_similarity
# return diversity_score
# def _determine_confidence(self, metrics: Dict[str, float]) -> bool:
# """Determine confidence using primary and secondary conditions."""
# # Primary conditions (must all be met)
# primary_conditions = [
# metrics['top_score'] >= self.confidence_threshold,
# metrics['response_diversity'] >= self.diversity_threshold,
# metrics['response_length_score'] >= self.thresholds['length_score']
# ]
# # Secondary conditions (majority must be met)
# secondary_conditions = [
# metrics['query_response_relevance'] >= self.thresholds['relevance'],
# metrics['top_3_score_gap'] >= self.thresholds['score_gap'],
# metrics['top_score'] >= (self.confidence_threshold * 1.1) # Extra confidence boost
# ]
# return all(primary_conditions) and sum(secondary_conditions) >= 2
# def _calculate_length_score(self, response: str) -> float:
# """Calculate length score with penalty for very short or long responses."""
# words = len(response.split())
# if words < self.min_response_length:
# return words / self.min_response_length
# elif words > 50: # Penalty for very long responses
# return min(1.0, 50 / words)
# return 1.0
# def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float:
# """Calculate average gap between top N scores."""
# if len(scores) < top_n + 1:
# return 0.0
# gaps = [scores[i] - scores[i + 1] for i in range(min(len(scores) - 1, top_n))]
# return np.mean(gaps) |