File size: 13,992 Bytes
300fe5d
f7b283c
300fe5d
f7b283c
 
 
 
 
5b413d1
300fe5d
 
7a0020b
 
 
 
 
 
 
5b413d1
300fe5d
 
5b413d1
7a0020b
f7b283c
 
7a0020b
300fe5d
7a0020b
 
 
 
 
 
 
 
300fe5d
 
 
f7b283c
7a0020b
5b413d1
7a0020b
f7b283c
7a0020b
 
 
f7b283c
5b413d1
300fe5d
 
 
 
 
 
7a0020b
5b413d1
f7b283c
7a0020b
 
5b413d1
f7b283c
7a0020b
f7b283c
 
 
 
 
 
 
 
 
 
5b413d1
7a0020b
 
 
 
 
 
 
5b413d1
7a0020b
f7b283c
 
 
 
 
7a0020b
 
 
 
f7b283c
 
5b413d1
7a0020b
 
 
 
5b413d1
7a0020b
 
 
 
5b413d1
7a0020b
 
5b413d1
7a0020b
 
 
 
 
 
 
f7b283c
300fe5d
7a0020b
 
 
 
 
 
5b413d1
7a0020b
 
5b413d1
7a0020b
 
 
5b413d1
7a0020b
 
5b413d1
7a0020b
 
 
 
5b413d1
7a0020b
 
f7b283c
 
7a0020b
 
 
f7b283c
 
 
 
 
5b413d1
f7b283c
 
 
7a0020b
f7b283c
5b413d1
7a0020b
 
f7b283c
7a0020b
 
 
 
 
 
 
 
5b413d1
7a0020b
 
 
 
 
 
 
f7b283c
7a0020b
 
 
300fe5d
f7b283c
300fe5d
7a0020b
 
 
 
300fe5d
7a0020b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
import numpy as np
from typing import List, Tuple, Dict, Any, TYPE_CHECKING
from sklearn.metrics.pairwise import cosine_similarity

from logger_config import config_logger
logger = config_logger(__name__)

if TYPE_CHECKING:
    from tf_data_pipeline import TFDataPipeline

class ResponseQualityChecker:
    """
    Enhanced quality checking that calculates:
      - Relevance between query & responses
      - Diversity among top responses
      - Response length scoring
      - Confidence determination based on multiple thresholds
    """

    def __init__(
        self,
        data_pipeline: 'TFDataPipeline',
        confidence_threshold: float = 0.45,
        diversity_threshold: float = 0.15,
        min_response_length: int = 5,
        similarity_cap: float = 0.85,
    ):
        """
        Args:
            data_pipeline: Reference to TFDataPipeline for encoding
            confidence_threshold: Minimum top_score for a 'confident' result
            diversity_threshold: Minimum required diversity among top responses
            min_response_length: Minimum words for a decent response
            similarity_cap: Cap on pairwise similarity for diversity calc
        """
        self.confidence_threshold = confidence_threshold
        self.diversity_threshold = diversity_threshold
        self.min_response_length = min_response_length
        self.similarity_cap = similarity_cap
        self.data_pipeline = data_pipeline

        # Additional thresholds for more refined checks
        self.thresholds = {
            'relevance': 0.30,     # Slightly relaxed
            'length_score': 0.80,  # Stricter length requirement
            'score_gap': 0.05      # Gap between top scores
        }

    def check_response_quality(
        self,
        query: str,
        responses: List[Tuple[str, float]]
    ) -> Dict[str, Any]:
        """
        Evaluate the quality of a set of ranked responses for a given query.

        Args:
            query: The user's original query
            responses: List of (response_text, score) sorted by descending score

        Returns:
            Dictionary of metrics, including 'is_confident' and others
        """
        if not responses:
            return {
                'response_diversity': 0.0,
                'query_response_relevance': 0.0,
                'is_confident': False,
                'top_score': 0.0,
                'response_length_score': 0.0,
                'top_3_score_gap': 0.0
            }

        # 1) Calculate relevant metrics
        metrics = {}
        metrics['response_diversity'] = self.calculate_diversity(responses)
        metrics['query_response_relevance'] = self.calculate_relevance(query, responses)
        metrics['response_length_score'] = self._average_length_score(responses)
        metrics['top_score'] = responses[0][1]
        metrics['top_3_score_gap'] = self._calculate_score_gap([s for _, s in responses], top_n=3)

        # 2) Determine confidence
        metrics['is_confident'] = self._determine_confidence(metrics)
        logger.info(f"Quality metrics: {metrics}")
        return metrics

    def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float:
        """
        Compute an overall 'relevance' metric between the query and the top responses.
        Uses an exponential transform on the similarity to penalize weaker matches.
        """
        if not responses:
            return 0.0

        # Encode query and responses
        query_emb = self.data_pipeline.encode_query(query)
        resp_texts = [r for r, _ in responses]
        resp_embs = self.data_pipeline.encode_responses(resp_texts)

        # Normalize embeddings
        query_emb = query_emb / (np.linalg.norm(query_emb) + 1e-12)
        resp_norms = np.linalg.norm(resp_embs, axis=1, keepdims=True) + 1e-12
        resp_embs = resp_embs / resp_norms

        # Cosine similarity
        sims = cosine_similarity([query_emb], resp_embs)[0]

        # Exponential transform: higher sims remain close to 1, lower sims drop quickly
        sims = np.exp(sims - 1.0)

        # Weighted average: give heavier weighting to higher-ranked items
        weights = np.exp(-np.arange(len(sims)) / 2.0)
        weighted_avg = np.average(sims, weights=weights)
        return float(weighted_avg)

    def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float:
        """
        Calculate how 'different' the top responses are from each other.
        Diversity = 1 - avg_cosine_similarity (capped).
        """
        if len(responses) < 2:
            return 1.0  # Single response is trivially 'unique'

        resp_texts = [r for r, _ in responses]
        embs = self.data_pipeline.encode_responses(resp_texts)

        # Pairwise similarity
        sim_matrix = cosine_similarity(embs, embs)
        np.fill_diagonal(sim_matrix, 0.0)

        # Cap similarity to avoid outliers
        sim_matrix = np.minimum(sim_matrix, self.similarity_cap)

        # Mean off-diagonal similarity
        sum_sims = np.sum(sim_matrix)
        num_pairs = len(resp_texts) * (len(resp_texts) - 1)
        avg_sim = sum_sims / num_pairs if num_pairs > 0 else 0.0

        # Invert to get diversity
        return 1.0 - avg_sim

    def _determine_confidence(self, metrics: Dict[str, float]) -> bool:
        """
        Decide if we're 'confident' based on multiple metric thresholds.
        """
        primary_conditions = [
            metrics['top_score'] >= self.confidence_threshold,
            metrics['response_diversity'] >= self.diversity_threshold,
            metrics['response_length_score'] >= self.thresholds['length_score']
        ]

        secondary_conditions = [
            metrics['query_response_relevance'] >= self.thresholds['relevance'],
            metrics['top_3_score_gap'] >= self.thresholds['score_gap'],
            metrics['top_score'] >= (self.confidence_threshold + 0.05)  # Extra buffer
        ]

        # Must pass all primary checks, and at least 2 of the 3 secondary
        return all(primary_conditions) and (sum(secondary_conditions) >= 2)

    def _average_length_score(self, responses: List[Tuple[str, float]]) -> float:
        """
        Compute an average length score across all responses.
        """
        length_scores = []
        for response, _ in responses:
            length_scores.append(self._length_score(response))
        return float(np.mean(length_scores)) if length_scores else 0.0

    def _length_score(self, text: str) -> float:
        """
        Calculate how well the text meets our length requirement.
        Scores 1.0 if text is >= min_response_length and not too long,
        else it scales down.
        """
        words = len(text.split())
        if words < self.min_response_length:
            return words / float(self.min_response_length)
        elif words > 60:  
            return max(0.5, 60.0 / words)  # Slight penalty for very long
        return 1.0

    def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float:
        """
        Calculate the average gap between consecutive scores in the top N.
        """
        if len(scores) < 2:
            return 0.0
        top_n = min(len(scores), top_n)
        gaps = []
        for i in range(top_n - 1):
            gaps.append(scores[i] - scores[i + 1])
        return float(np.mean(gaps)) if gaps else 0.0

# import numpy as np
# from typing import List, Tuple, Dict, Any, TYPE_CHECKING
# from sklearn.metrics.pairwise import cosine_similarity

# from logger_config import config_logger
# logger = config_logger(__name__)

# if TYPE_CHECKING:
#     from tf_data_pipeline import TFDataPipeline

# class ResponseQualityChecker:
#     """Enhanced quality checking with dynamic thresholds."""

#     def __init__(
#         self,
#         data_pipeline: 'TFDataPipeline',
#         confidence_threshold: float = 0.4,
#         diversity_threshold: float = 0.15,
#         min_response_length: int = 5,
#         similarity_cap: float = 0.85  # Renamed from max_similarity_ratio and used in diversity calc
#     ):
#         self.confidence_threshold = confidence_threshold
#         self.diversity_threshold = diversity_threshold
#         self.min_response_length = min_response_length
#         self.similarity_cap = similarity_cap
#         self.data_pipeline = data_pipeline  # Reference to TFDataPipeline

#         # Dynamic thresholds based on response patterns
#         self.thresholds = {
#             'relevance': 0.35,    
#             'length_score': 0.85,  
#             'score_gap': 0.04     
#         }

#     def check_response_quality(
#         self,
#         query: str,
#         responses: List[Tuple[str, float]]
#     ) -> Dict[str, Any]:
#         """
#         Evaluate the quality of responses based on various metrics.

#         Args:
#             query: The user's query
#             responses: List of (response_text, score) tuples

#         Returns:
#             Dict containing quality metrics and confidence assessment
#         """
#         if not responses:
#             return {
#                 'response_diversity': 0.0,
#                 'query_response_relevance': 0.0,
#                 'is_confident': False,
#                 'top_score': 0.0,
#                 'response_length_score': 0.0,
#                 'top_3_score_gap': 0.0
#             }

#         # Calculate core metrics
#         metrics = {
#             'response_diversity': self.calculate_diversity(responses),
#             'query_response_relevance': self.calculate_relevance(query, responses),
#             'response_length_score': np.mean([
#                 self._calculate_length_score(response) for response, _ in responses
#             ]),
#             'top_score': responses[0][1],
#             'top_3_score_gap': self._calculate_score_gap([score for _, score in responses], top_n=3)
#         }

#         # Determine confidence using thresholds
#         metrics['is_confident'] = self._determine_confidence(metrics)

#         logger.info(f"Quality metrics: {metrics}")
#         return metrics

#     def calculate_relevance(self, query: str, responses: List[Tuple[str, float]]) -> float:
#         """Calculate relevance with stricter scoring."""
#         if not responses:
#             return 0.0

#         query_embedding = self.data_pipeline.encode_query(query)
#         response_texts = [resp for resp, _ in responses]
#         response_embeddings = self.data_pipeline.encode_responses(response_texts)

#         # Normalize embeddings
#         query_embedding = query_embedding / np.linalg.norm(query_embedding)
#         response_embeddings = response_embeddings / np.linalg.norm(response_embeddings, axis=1)[:, np.newaxis]

#         # Compute similarities with exponential decay for far matches
#         similarities = cosine_similarity([query_embedding], response_embeddings)[0]
#         similarities = np.exp(similarities - 1)  # Penalize lower similarities more strongly

#         # Apply stronger position weighting
#         weights = np.exp(-np.arange(len(similarities)) / 2)
        
#         return float(np.average(similarities, weights=weights))

#     def calculate_diversity(self, responses: List[Tuple[str, float]]) -> float:
#         """Calculate diversity with length normalization and similarity capping."""
#         if not responses:
#             return 0.0

#         response_texts = [resp for resp, _ in responses]
#         embeddings = self.data_pipeline.encode_responses(response_texts)
#         if len(embeddings) < 2:
#             return 1.0

#         # Calculate pairwise cosine similarities
#         similarity_matrix = cosine_similarity(embeddings)
#         np.fill_diagonal(similarity_matrix, 0)  # Exclude self-similarity

#         # Apply similarity cap
#         similarity_matrix = np.minimum(similarity_matrix, self.similarity_cap)

#         # Calculate average similarity
#         sum_similarities = np.sum(similarity_matrix)
#         num_pairs = len(embeddings) * (len(embeddings) - 1)
#         avg_similarity = sum_similarities / num_pairs if num_pairs > 0 else 0.0

#         # Diversity is inversely related to average similarity
#         diversity_score = 1 - avg_similarity
#         return diversity_score

#     def _determine_confidence(self, metrics: Dict[str, float]) -> bool:
#         """Determine confidence using primary and secondary conditions."""
#         # Primary conditions (must all be met)
#         primary_conditions = [
#             metrics['top_score'] >= self.confidence_threshold,
#             metrics['response_diversity'] >= self.diversity_threshold,
#             metrics['response_length_score'] >= self.thresholds['length_score']
#         ]

#         # Secondary conditions (majority must be met)
#         secondary_conditions = [
#             metrics['query_response_relevance'] >= self.thresholds['relevance'],
#             metrics['top_3_score_gap'] >= self.thresholds['score_gap'],
#             metrics['top_score'] >= (self.confidence_threshold * 1.1)  # Extra confidence boost
#         ]

#         return all(primary_conditions) and sum(secondary_conditions) >= 2

#     def _calculate_length_score(self, response: str) -> float:
#         """Calculate length score with penalty for very short or long responses."""
#         words = len(response.split())

#         if words < self.min_response_length:
#             return words / self.min_response_length
#         elif words > 50:  # Penalty for very long responses
#             return min(1.0, 50 / words)
#         return 1.0

#     def _calculate_score_gap(self, scores: List[float], top_n: int = 3) -> float:
#         """Calculate average gap between top N scores."""
#         if len(scores) < top_n + 1:
#             return 0.0
#         gaps = [scores[i] - scores[i + 1] for i in range(min(len(scores) - 1, top_n))]
#         return np.mean(gaps)