|
|
import os |
|
|
import json |
|
|
import time |
|
|
import numpy as np |
|
|
from tqdm import tqdm |
|
|
import nltk |
|
|
|
|
|
from Logger import GetLogger, MetricsLogger |
|
|
from Embeddings import GetEmbeddings |
|
|
|
|
|
|
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
from rouge_score import rouge_scorer |
|
|
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction |
|
|
from bert_score import score as bert_score |
|
|
|
|
|
class Evaluator: |
|
|
""" |
|
|
Evaluation pipeline for financial QA Agent. |
|
|
Uses eval_dataset.json to run queries, collect answers, and compute metrics. |
|
|
""" |
|
|
def __init__(self, config_path="config.json", logger=None): |
|
|
with open(config_path, "r") as f: |
|
|
self.config = json.load(f) |
|
|
self.paths = self.config["paths"] |
|
|
|
|
|
|
|
|
if not logger: |
|
|
obj = GetLogger() |
|
|
logger = obj.get_logger() |
|
|
self.logger = logger |
|
|
|
|
|
|
|
|
self.metrics_logger = MetricsLogger(logger=self.logger) |
|
|
|
|
|
|
|
|
self.agent = GetEmbeddings(config_path=config_path, logger=self.logger) |
|
|
self.agent.run() |
|
|
self.agent.load_summarizer() |
|
|
self.encoder = self.agent.load_encoder() |
|
|
|
|
|
|
|
|
self.dataset = self.load_dataset() |
|
|
self.results = [] |
|
|
self.failed_queries = [] |
|
|
|
|
|
nltk.download('punkt', quiet=True) |
|
|
|
|
|
|
|
|
def load_dataset(self): |
|
|
path = self.paths["eval_dataset"] |
|
|
if not os.path.exists(path): |
|
|
raise FileNotFoundError(f"Dataset not found: {path}") |
|
|
with open(path, "r", encoding="utf-8") as f: |
|
|
return json.load(f) |
|
|
|
|
|
def measure_latency(self, func, *args, **kwargs): |
|
|
"""Helper: measure time taken by a function call.""" |
|
|
start = time.time() |
|
|
result = func(*args, **kwargs) |
|
|
latency = time.time() - start |
|
|
return result, latency |
|
|
|
|
|
def evaluate_query(self, query, reference): |
|
|
"""Run one query, compare answer vs. reference, compute metrics.""" |
|
|
try: |
|
|
|
|
|
system_answer, latency = self.measure_latency(self.agent.answer_query, query) |
|
|
|
|
|
|
|
|
ref_emb = self.encoder.encode([reference], convert_to_numpy=True) |
|
|
ans_emb = self.encoder.encode([system_answer], convert_to_numpy=True) |
|
|
retrieval_quality = float(cosine_similarity(ref_emb, ans_emb)[0][0]) |
|
|
|
|
|
|
|
|
scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True) |
|
|
rouge_score = scorer.score(reference, system_answer)['rougeL'].fmeasure |
|
|
|
|
|
|
|
|
smoothie = SmoothingFunction().method4 |
|
|
bleu = sentence_bleu([reference.split()], system_answer.split(), smoothing_function=smoothie) |
|
|
|
|
|
|
|
|
P, R, F1 = bert_score([system_answer], [reference], lang="en") |
|
|
bert_f1 = float(F1.mean()) |
|
|
|
|
|
metrics = { |
|
|
"query": query, |
|
|
"reference": reference, |
|
|
"system_answer": system_answer, |
|
|
"retrieval_quality": retrieval_quality, |
|
|
"rougeL": rouge_score, |
|
|
"bleu": bleu, |
|
|
"bertscore_f1": bert_f1, |
|
|
"latency_sec": latency |
|
|
} |
|
|
|
|
|
|
|
|
self.metrics_logger.log_query_metrics(query, metrics) |
|
|
|
|
|
return metrics |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Error evaluating query '{query}': {e}") |
|
|
return None |
|
|
|
|
|
def aggregate_summary(self): |
|
|
"""Aggregate metrics across all queries for global averages.""" |
|
|
if not self.results: |
|
|
return {} |
|
|
|
|
|
summary = { |
|
|
"avg_retrieval_quality": float(np.mean([r["retrieval_quality"] for r in self.results])), |
|
|
"avg_rougeL": float(np.mean([r["rougeL"] for r in self.results])), |
|
|
"avg_bleu": float(np.mean([r["bleu"] for r in self.results])), |
|
|
"avg_bertscore_f1": float(np.mean([r["bertscore_f1"] for r in self.results])), |
|
|
"avg_latency_sec": float(np.mean([r["latency_sec"] for r in self.results])), |
|
|
"num_queries": len(self.results) |
|
|
} |
|
|
return summary |
|
|
|
|
|
def run(self): |
|
|
"""Run evaluation on entire dataset.""" |
|
|
self.logger.info("Starting Evaluation...") |
|
|
|
|
|
for item in tqdm(self.dataset, desc="Queries"): |
|
|
query = item["query"] |
|
|
reference = item["reference"] |
|
|
result = self.evaluate_query(query, reference) |
|
|
if result: |
|
|
self.results.append(result) |
|
|
|
|
|
|
|
|
|
|
|
with open(self.paths["eval_results"], "w", encoding="utf-8") as f: |
|
|
json.dump(self.results, f, indent=2) |
|
|
|
|
|
if self.failed_queries: |
|
|
with open(self.paths["failed_queries"], "w", encoding="utf-8") as f: |
|
|
json.dump(self.failed_queries, f, indent=2) |
|
|
|
|
|
|
|
|
|
|
|
summary = self.aggregate_summary() |
|
|
self.logger.info(f"π Evaluation summary: {summary}") |
|
|
|
|
|
|
|
|
with open(self.paths.get("eval_summary", "eval_summary.json"), "w", encoding="utf-8") as f: |
|
|
json.dump(summary, f, indent=2) |
|
|
|
|
|
return self.results, summary |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
evaluator = Evaluator() |
|
|
results, summary = evaluator.run() |
|
|
|
|
|
print("\n=== Sample Results ===") |
|
|
print(json.dumps(results[:2], indent=2)) |
|
|
print("\n=== Summary ===") |
|
|
print(json.dumps(summary, indent=2)) |
|
|
|