| import numpy as np | |
| from typing import List, Tuple, Dict, Any, TYPE_CHECKING | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from logger_config import config_logger | |
| logger = config_logger(__name__) | |
| if TYPE_CHECKING: | |
| from tf_data_pipeline import TFDataPipeline | |
| class ResponseQualityChecker: | |
| """ | |
| Enhanced quality checking that calculates: | |
| - Relevance between query & responses | |
| - Diversity among top responses | |
| - Response length scoring | |
| - Confidence determination based on multiple thresholds | |
| """ | |
| def __init__( | |
| self, | |
| data_pipeline: 'TFDataPipeline', | |
| confidence_threshold: float = 0.45, | |
| diversity_threshold: float = 0.15, | |
| min_response_length: int = 5, | |
| similarity_cap: float = 0.85, | |
| ): | |
| """ | |
| Args: | |
| data_pipeline: Reference to TFDataPipeline for encoding | |
| confidence_threshold: Minimum top_score for a 'confident' result | |
| diversity_threshold: Minimum required diversity among top responses | |
| min_response_length: Minimum words for a decent response | |
| similarity_cap: Cap on pairwise similarity for diversity calc | |
| """ | |
| self.confidence_threshold = confidence_threshold | |
| self.diversity_threshold = diversity_threshold | |
| self.min_response_length = min_response_length | |
| self.similarity_cap = similarity_cap | |
| self.data_pipeline = data_pipeline | |
| # Additional thresholds for more refined checks | |
| self.thresholds = { | |
| 'relevance': 0.30, # Slightly relaxed | |
| 'length_score': 0.80, # Stricter length requirement | |
| 'score_gap': 0.05 # Gap between top scores | |
| } | |
| def check_response_quality( | |
| self, | |
| query: str, | |
| responses: List[Tuple[str, float]] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Evaluate the quality of a set of ranked responses for a given query. | |
| Args: | |
| query: The user's original query | |
| responses: List of (response_text, score) sorted by descending score | |
| Returns: | |
| Dictionary of metrics, including 'is_confident' and others | |
| """ | |
| if not responses: | |
| return { | |
| 'response_diversity': 0.0, | |
| 'query_response_relevance': 0.0, | |
| 'is_confident': False, | |
| 'top_score': 0.0, | |
| 'response_length_score': 0.0, | |
| 'top_3_score_gap': 0.0 | |
| } | |
| # 1) Calculate relevant metrics | |
| metrics = {} | |
| metrics['response_diversity'] = self.calculate_diversity(responses) | |
| metrics['query_response_relevance'] = self.calculate_relevance(query, responses) | |
| metrics['response_length_score'] = self._average_length_score(responses) | |
| metrics['top_score'] = responses[0][1] | |
| metrics['top_3_score_gap'] = self._calculate_score_gap([s for _, s in responses], top_n=3) | |
| # 2) Determine confidence | |
| metrics['is_confident'] = self._determine_confidence(metrics) | |
| logger.info(f"Quality metrics: {metrics}") | |
| return metrics | |
| def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float: | |
| """ | |
| Compute an overall 'relevance' metric between the query and the top responses. | |
| Uses an exponential transform on the similarity to penalize weaker matches. | |
| """ | |
| if not responses: | |
| return 0.0 | |
| # Encode query and responses | |
| query_emb = self.data_pipeline.encode_query(query) | |
| resp_texts = [r for r, _ in responses] | |
| resp_embs = self.data_pipeline.encode_responses(resp_texts) | |
| # Normalize embeddings | |
| query_emb = query_emb / (np.linalg.norm(query_emb) + 1e-12) | |
| resp_norms = np.linalg.norm(resp_embs, axis=1, keepdims=True) + 1e-12 | |
| resp_embs = resp_embs / resp_norms | |
| # Cosine similarity | |
| sims = cosine_similarity([query_emb], resp_embs)[0] | |
| # Exponential transform: higher sims remain close to 1, lower sims drop quickly | |
| sims = np.exp(sims - 1.0) | |
| # Weighted average: give heavier weighting to higher-ranked items | |
| weights = np.exp(-np.arange(len(sims)) / 2.0) | |
| weighted_avg = np.average(sims, weights=weights) | |
| return float(weighted_avg) | |
| def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float: | |
| """ | |
| Calculate how 'different' the top responses are from each other. | |
| Diversity = 1 - avg_cosine_similarity (capped). | |
| """ | |
| if len(responses) < 2: | |
| return 1.0 # Single response is trivially 'unique' | |
| resp_texts = [r for r, _ in responses] | |
| embs = self.data_pipeline.encode_responses(resp_texts) | |
| # Pairwise similarity | |
| sim_matrix = cosine_similarity(embs, embs) | |
| np.fill_diagonal(sim_matrix, 0.0) | |
| # Cap similarity to avoid outliers | |
| sim_matrix = np.minimum(sim_matrix, self.similarity_cap) | |
| # Mean off-diagonal similarity | |
| sum_sims = np.sum(sim_matrix) | |
| num_pairs = len(resp_texts) * (len(resp_texts) - 1) | |
| avg_sim = sum_sims / num_pairs if num_pairs > 0 else 0.0 | |
| # Invert to get diversity | |
| return 1.0 - avg_sim | |
| def _determine_confidence(self, metrics: Dict[str, float]) -> bool: | |
| """ | |
| Decide if we're 'confident' based on multiple metric thresholds. | |
| """ | |
| primary_conditions = [ | |
| metrics['top_score'] >= self.confidence_threshold, | |
| metrics['response_diversity'] >= self.diversity_threshold, | |
| metrics['response_length_score'] >= self.thresholds['length_score'] | |
| ] | |
| secondary_conditions = [ | |
| metrics['query_response_relevance'] >= self.thresholds['relevance'], | |
| metrics['top_3_score_gap'] >= self.thresholds['score_gap'], | |
| metrics['top_score'] >= (self.confidence_threshold + 0.05) # Extra buffer | |
| ] | |
| # Must pass all primary checks, and at least 2 of the 3 secondary | |
| return all(primary_conditions) and (sum(secondary_conditions) >= 2) | |
| def _average_length_score(self, responses: List[Tuple[str, float]]) -> float: | |
| """ | |
| Compute an average length score across all responses. | |
| """ | |
| length_scores = [] | |
| for response, _ in responses: | |
| length_scores.append(self._length_score(response)) | |
| return float(np.mean(length_scores)) if length_scores else 0.0 | |
| def _length_score(self, text: str) -> float: | |
| """ | |
| Calculate how well the text meets our length requirement. | |
| Scores 1.0 if text is >= min_response_length and not too long, | |
| else it scales down. | |
| """ | |
| words = len(text.split()) | |
| if words < self.min_response_length: | |
| return words / float(self.min_response_length) | |
| elif words > 60: | |
| return max(0.5, 60.0 / words) # Slight penalty for very long | |
| return 1.0 | |
| def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float: | |
| """ | |
| Calculate the average gap between consecutive scores in the top N. | |
| """ | |
| if len(scores) < 2: | |
| return 0.0 | |
| top_n = min(len(scores), top_n) | |
| gaps = [] | |
| for i in range(top_n - 1): | |
| gaps.append(scores[i] - scores[i + 1]) | |
| return float(np.mean(gaps)) if gaps else 0.0 | |
| # import numpy as np | |
| # from typing import List, Tuple, Dict, Any, TYPE_CHECKING | |
| # from sklearn.metrics.pairwise import cosine_similarity | |
| # from logger_config import config_logger | |
| # logger = config_logger(__name__) | |
| # if TYPE_CHECKING: | |
| # from tf_data_pipeline import TFDataPipeline | |
| # class ResponseQualityChecker: | |
| # """Enhanced quality checking with dynamic thresholds.""" | |
| # def __init__( | |
| # self, | |
| # data_pipeline: 'TFDataPipeline', | |
| # confidence_threshold: float = 0.4, | |
| # diversity_threshold: float = 0.15, | |
| # min_response_length: int = 5, | |
| # similarity_cap: float = 0.85 # Renamed from max_similarity_ratio and used in diversity calc | |
| # ): | |
| # self.confidence_threshold = confidence_threshold | |
| # self.diversity_threshold = diversity_threshold | |
| # self.min_response_length = min_response_length | |
| # self.similarity_cap = similarity_cap | |
| # self.data_pipeline = data_pipeline # Reference to TFDataPipeline | |
| # # Dynamic thresholds based on response patterns | |
| # self.thresholds = { | |
| # 'relevance': 0.35, | |
| # 'length_score': 0.85, | |
| # 'score_gap': 0.04 | |
| # } | |
| # def check_response_quality( | |
| # self, | |
| # query: str, | |
| # responses: List[Tuple[str, float]] | |
| # ) -> Dict[str, Any]: | |
| # """ | |
| # Evaluate the quality of responses based on various metrics. | |
| # Args: | |
| # query: The user's query | |
| # responses: List of (response_text, score) tuples | |
| # Returns: | |
| # Dict containing quality metrics and confidence assessment | |
| # """ | |
| # if not responses: | |
| # return { | |
| # 'response_diversity': 0.0, | |
| # 'query_response_relevance': 0.0, | |
| # 'is_confident': False, | |
| # 'top_score': 0.0, | |
| # 'response_length_score': 0.0, | |
| # 'top_3_score_gap': 0.0 | |
| # } | |
| # # Calculate core metrics | |
| # metrics = { | |
| # 'response_diversity': self.calculate_diversity(responses), | |
| # 'query_response_relevance': self.calculate_relevance(query, responses), | |
| # 'response_length_score': np.mean([ | |
| # self._calculate_length_score(response) for response, _ in responses | |
| # ]), | |
| # 'top_score': responses[0][1], | |
| # 'top_3_score_gap': self._calculate_score_gap([score for _, score in responses], top_n=3) | |
| # } | |
| # # Determine confidence using thresholds | |
| # metrics['is_confident'] = self._determine_confidence(metrics) | |
| # logger.info(f"Quality metrics: {metrics}") | |
| # return metrics | |
| # def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float: | |
| # """Calculate relevance with stricter scoring.""" | |
| # if not responses: | |
| # return 0.0 | |
| # query_embedding = self.data_pipeline.encode_query(query) | |
| # response_texts = [resp for resp, _ in responses] | |
| # response_embeddings = self.data_pipeline.encode_responses(response_texts) | |
| # # Normalize embeddings | |
| # query_embedding = query_embedding / np.linalg.norm(query_embedding) | |
| # response_embeddings = response_embeddings / np.linalg.norm(response_embeddings, axis=1)[:, np.newaxis] | |
| # # Compute similarities with exponential decay for far matches | |
| # similarities = cosine_similarity([query_embedding], response_embeddings)[0] | |
| # similarities = np.exp(similarities - 1) # Penalize lower similarities more strongly | |
| # # Apply stronger position weighting | |
| # weights = np.exp(-np.arange(len(similarities)) / 2) | |
| # return float(np.average(similarities, weights=weights)) | |
| # def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float: | |
| # """Calculate diversity with length normalization and similarity capping.""" | |
| # if not responses: | |
| # return 0.0 | |
| # response_texts = [resp for resp, _ in responses] | |
| # embeddings = self.data_pipeline.encode_responses(response_texts) | |
| # if len(embeddings) < 2: | |
| # return 1.0 | |
| # # Calculate pairwise cosine similarities | |
| # similarity_matrix = cosine_similarity(embeddings) | |
| # np.fill_diagonal(similarity_matrix, 0) # Exclude self-similarity | |
| # # Apply similarity cap | |
| # similarity_matrix = np.minimum(similarity_matrix, self.similarity_cap) | |
| # # Calculate average similarity | |
| # sum_similarities = np.sum(similarity_matrix) | |
| # num_pairs = len(embeddings) * (len(embeddings) - 1) | |
| # avg_similarity = sum_similarities / num_pairs if num_pairs > 0 else 0.0 | |
| # # Diversity is inversely related to average similarity | |
| # diversity_score = 1 - avg_similarity | |
| # return diversity_score | |
| # def _determine_confidence(self, metrics: Dict[str, float]) -> bool: | |
| # """Determine confidence using primary and secondary conditions.""" | |
| # # Primary conditions (must all be met) | |
| # primary_conditions = [ | |
| # metrics['top_score'] >= self.confidence_threshold, | |
| # metrics['response_diversity'] >= self.diversity_threshold, | |
| # metrics['response_length_score'] >= self.thresholds['length_score'] | |
| # ] | |
| # # Secondary conditions (majority must be met) | |
| # secondary_conditions = [ | |
| # metrics['query_response_relevance'] >= self.thresholds['relevance'], | |
| # metrics['top_3_score_gap'] >= self.thresholds['score_gap'], | |
| # metrics['top_score'] >= (self.confidence_threshold * 1.1) # Extra confidence boost | |
| # ] | |
| # return all(primary_conditions) and sum(secondary_conditions) >= 2 | |
| # def _calculate_length_score(self, response: str) -> float: | |
| # """Calculate length score with penalty for very short or long responses.""" | |
| # words = len(response.split()) | |
| # if words < self.min_response_length: | |
| # return words / self.min_response_length | |
| # elif words > 50: # Penalty for very long responses | |
| # return min(1.0, 50 / words) | |
| # return 1.0 | |
| # def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float: | |
| # """Calculate average gap between top N scores.""" | |
| # if len(scores) < top_n + 1: | |
| # return 0.0 | |
| # gaps = [scores[i] - scores[i + 1] for i in range(min(len(scores) - 1, top_n))] | |
| # return np.mean(gaps) |