import numpy as np from typing import List, Tuple, Dict, Any, TYPE_CHECKING from sklearn.metrics.pairwise import cosine_similarity from logger_config import config_logger logger = config_logger(__name__) if TYPE_CHECKING: from chatbot_model import RetrievalChatbot class ResponseQualityChecker: """Enhanced quality checking with dynamic thresholds.""" def __init__( self, chatbot: 'RetrievalChatbot', confidence_threshold: float = 0.6, diversity_threshold: float = 0.15, min_response_length: int = 5, similarity_cap: float = 0.85 # Renamed from max_similarity_ratio and used in diversity calc ): self.confidence_threshold = confidence_threshold self.diversity_threshold = diversity_threshold self.min_response_length = min_response_length self.similarity_cap = similarity_cap self.chatbot = chatbot # Dynamic thresholds based on response patterns self.thresholds = { 'relevance': 0.35, 'length_score': 0.85, 'score_gap': 0.07 } def check_response_quality( self, query: str, responses: List[Tuple[str, float]] ) -> Dict[str, Any]: """ Evaluate the quality of responses based on various metrics. Args: query: The user's query responses: List of (response_text, score) tuples Returns: Dict containing quality metrics and confidence assessment """ if not responses: return { 'response_diversity': 0.0, 'query_response_relevance': 0.0, 'is_confident': False, 'top_score': 0.0, 'response_length_score': 0.0, 'top_3_score_gap': 0.0 } # Calculate core metrics metrics = { 'response_diversity': self.calculate_diversity(responses), 'query_response_relevance': self.calculate_relevance(query, responses), 'response_length_score': np.mean([ self._calculate_length_score(response) for response, _ in responses ]), 'top_score': responses[0][1], 'top_3_score_gap': self._calculate_score_gap([score for _, score in responses], top_n=3) } # Determine confidence using thresholds metrics['is_confident'] = self._determine_confidence(metrics) logger.info(f"Quality metrics: {metrics}") return metrics def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float: """Calculate relevance as weighted similarity between query and responses.""" if not responses: return 0.0 # Get embeddings query_embedding = self.encode_query(query) response_embeddings = [self.encode_text(response) for response, _ in responses] # Compute similarities with decreasing weights for later responses similarities = cosine_similarity([query_embedding], response_embeddings)[0] weights = np.array([1.0 / (i + 1) for i in range(len(similarities))]) return np.average(similarities, weights=weights) def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float: """Calculate diversity with length normalization and similarity capping.""" if not responses: return 0.0 embeddings = [self.encode_text(response) for response, _ in responses] if len(embeddings) < 2: return 1.0 # Calculate similarities and apply cap similarity_matrix = cosine_similarity(embeddings) similarity_matrix = np.minimum(similarity_matrix, self.similarity_cap) # Apply length normalization lengths = [len(resp[0].split()) for resp in responses] length_ratios = np.array([min(a, b) / max(a, b) for a in lengths for b in lengths]) length_ratios = length_ratios.reshape(len(responses), len(responses)) # Combine factors with weights adjusted_similarity = (similarity_matrix * 0.7 + length_ratios * 0.3) # Calculate final score sum_similarities = np.sum(adjusted_similarity) - len(responses) num_pairs = len(responses) * (len(responses) - 1) avg_similarity = sum_similarities / num_pairs if num_pairs > 0 else 0.0 return 1 - avg_similarity def _determine_confidence(self, metrics: Dict[str, float]) -> bool: """Determine confidence using primary and secondary conditions.""" # Primary conditions (must all be met) primary_conditions = [ metrics['top_score'] >= self.confidence_threshold, metrics['response_diversity'] >= self.diversity_threshold, metrics['response_length_score'] >= self.thresholds['length_score'] ] # Secondary conditions (majority must be met) secondary_conditions = [ metrics['query_response_relevance'] >= self.thresholds['relevance'], metrics['top_3_score_gap'] >= self.thresholds['score_gap'], metrics['top_score'] >= (self.confidence_threshold * 1.1) # Extra confidence boost ] return all(primary_conditions) and sum(secondary_conditions) >= 2 def _calculate_length_score(self, response: str) -> float: """Calculate length score with penalty for very short or long responses.""" words = len(response.split()) if words < self.min_response_length: return words / self.min_response_length elif words > 50: # Penalty for very long responses return min(1.0, 50 / words) return 1.0 def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float: """Calculate average gap between top N scores.""" if len(scores) < top_n + 1: return 0.0 gaps = [scores[i] - scores[i + 1] for i in range(min(len(scores) - 1, top_n))] return np.mean(gaps) def encode_text(self, text: str) -> np.ndarray: """Encode response text to embedding.""" embedding_tensor = self.chatbot.encode_responses([text]) embedding = embedding_tensor.numpy()[0].astype('float32') return self._normalize_embedding(embedding) def encode_query(self, query: str) -> np.ndarray: """Encode query text to embedding.""" embedding_tensor = self.chatbot.encode_query(query) embedding = embedding_tensor.numpy()[0].astype('float32') return self._normalize_embedding(embedding) def _normalize_embedding(self, embedding: np.ndarray) -> np.ndarray: """Normalize embedding vector.""" norm = np.linalg.norm(embedding) return embedding / norm if norm > 0 else embedding