|
import numpy as np |
|
from typing import List, Tuple, Dict, Any, TYPE_CHECKING |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
from logger_config import config_logger |
|
logger = config_logger(__name__) |
|
|
|
if TYPE_CHECKING: |
|
from chatbot_model import RetrievalChatbot |
|
|
|
class ResponseQualityChecker: |
|
"""Enhanced quality checking with dynamic thresholds.""" |
|
|
|
def __init__( |
|
self, |
|
chatbot: 'RetrievalChatbot', |
|
confidence_threshold: float = 0.6, |
|
diversity_threshold: float = 0.15, |
|
min_response_length: int = 5, |
|
similarity_cap: float = 0.85 |
|
): |
|
self.confidence_threshold = confidence_threshold |
|
self.diversity_threshold = diversity_threshold |
|
self.min_response_length = min_response_length |
|
self.similarity_cap = similarity_cap |
|
self.chatbot = chatbot |
|
|
|
|
|
self.thresholds = { |
|
'relevance': 0.35, |
|
'length_score': 0.85, |
|
'score_gap': 0.07 |
|
} |
|
|
|
def check_response_quality( |
|
self, |
|
query: str, |
|
responses: List[Tuple[str, float]] |
|
) -> Dict[str, Any]: |
|
""" |
|
Evaluate the quality of responses based on various metrics. |
|
|
|
Args: |
|
query: The user's query |
|
responses: List of (response_text, score) tuples |
|
|
|
Returns: |
|
Dict containing quality metrics and confidence assessment |
|
""" |
|
if not responses: |
|
return { |
|
'response_diversity': 0.0, |
|
'query_response_relevance': 0.0, |
|
'is_confident': False, |
|
'top_score': 0.0, |
|
'response_length_score': 0.0, |
|
'top_3_score_gap': 0.0 |
|
} |
|
|
|
|
|
metrics = { |
|
'response_diversity': self.calculate_diversity(responses), |
|
'query_response_relevance': self.calculate_relevance(query, responses), |
|
'response_length_score': np.mean([ |
|
self._calculate_length_score(response) for response, _ in responses |
|
]), |
|
'top_score': responses[0][1], |
|
'top_3_score_gap': self._calculate_score_gap([score for _, score in responses], top_n=3) |
|
} |
|
|
|
|
|
metrics['is_confident'] = self._determine_confidence(metrics) |
|
|
|
logger.info(f"Quality metrics: {metrics}") |
|
return metrics |
|
|
|
def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float: |
|
"""Calculate relevance as weighted similarity between query and responses.""" |
|
if not responses: |
|
return 0.0 |
|
|
|
|
|
query_embedding = self.encode_query(query) |
|
response_embeddings = [self.encode_text(response) for response, _ in responses] |
|
|
|
|
|
similarities = cosine_similarity([query_embedding], response_embeddings)[0] |
|
weights = np.array([1.0 / (i + 1) for i in range(len(similarities))]) |
|
|
|
return np.average(similarities, weights=weights) |
|
|
|
def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float: |
|
"""Calculate diversity with length normalization and similarity capping.""" |
|
if not responses: |
|
return 0.0 |
|
|
|
embeddings = [self.encode_text(response) for response, _ in responses] |
|
if len(embeddings) < 2: |
|
return 1.0 |
|
|
|
|
|
similarity_matrix = cosine_similarity(embeddings) |
|
similarity_matrix = np.minimum(similarity_matrix, self.similarity_cap) |
|
|
|
|
|
lengths = [len(resp[0].split()) for resp in responses] |
|
length_ratios = np.array([min(a, b) / max(a, b) for a in lengths for b in lengths]) |
|
length_ratios = length_ratios.reshape(len(responses), len(responses)) |
|
|
|
|
|
adjusted_similarity = (similarity_matrix * 0.7 + length_ratios * 0.3) |
|
|
|
|
|
sum_similarities = np.sum(adjusted_similarity) - len(responses) |
|
num_pairs = len(responses) * (len(responses) - 1) |
|
avg_similarity = sum_similarities / num_pairs if num_pairs > 0 else 0.0 |
|
|
|
return 1 - avg_similarity |
|
|
|
def _determine_confidence(self, metrics: Dict[str, float]) -> bool: |
|
"""Determine confidence using primary and secondary conditions.""" |
|
|
|
primary_conditions = [ |
|
metrics['top_score'] >= self.confidence_threshold, |
|
metrics['response_diversity'] >= self.diversity_threshold, |
|
metrics['response_length_score'] >= self.thresholds['length_score'] |
|
] |
|
|
|
|
|
secondary_conditions = [ |
|
metrics['query_response_relevance'] >= self.thresholds['relevance'], |
|
metrics['top_3_score_gap'] >= self.thresholds['score_gap'], |
|
metrics['top_score'] >= (self.confidence_threshold * 1.1) |
|
] |
|
|
|
return all(primary_conditions) and sum(secondary_conditions) >= 2 |
|
|
|
def _calculate_length_score(self, response: str) -> float: |
|
"""Calculate length score with penalty for very short or long responses.""" |
|
words = len(response.split()) |
|
|
|
if words < self.min_response_length: |
|
return words / self.min_response_length |
|
elif words > 50: |
|
return min(1.0, 50 / words) |
|
return 1.0 |
|
|
|
def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float: |
|
"""Calculate average gap between top N scores.""" |
|
if len(scores) < top_n + 1: |
|
return 0.0 |
|
gaps = [scores[i] - scores[i + 1] for i in range(min(len(scores) - 1, top_n))] |
|
return np.mean(gaps) |
|
|
|
def encode_text(self, text: str) -> np.ndarray: |
|
"""Encode response text to embedding.""" |
|
embedding_tensor = self.chatbot.encode_responses([text]) |
|
embedding = embedding_tensor.numpy()[0].astype('float32') |
|
return self._normalize_embedding(embedding) |
|
|
|
def encode_query(self, query: str) -> np.ndarray: |
|
"""Encode query text to embedding.""" |
|
embedding_tensor = self.chatbot.encode_query(query) |
|
embedding = embedding_tensor.numpy()[0].astype('float32') |
|
return self._normalize_embedding(embedding) |
|
|
|
def _normalize_embedding(self, embedding: np.ndarray) -> np.ndarray: |
|
"""Normalize embedding vector.""" |
|
norm = np.linalg.norm(embedding) |
|
return embedding / norm if norm > 0 else embedding |
|
|