import os import json import time import numpy as np from tqdm import tqdm import nltk from Logger import GetLogger, MetricsLogger from Embeddings import GetEmbeddings # Metrics from sklearn.metrics.pairwise import cosine_similarity from rouge_score import rouge_scorer from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction from bert_score import score as bert_score class Evaluator: """ Evaluation pipeline for financial QA Agent. Uses eval_dataset.json to run queries, collect answers, and compute metrics. """ def __init__(self, config_path="config.json", logger=None): with open(config_path, "r") as f: self.config = json.load(f) self.paths = self.config["paths"] if not logger: obj = GetLogger() logger = obj.get_logger() self.logger = logger # Metrics logger self.metrics_logger = MetricsLogger(logger=self.logger) # Initialize Agent self.agent = GetEmbeddings(config_path=config_path, logger=self.logger) self.agent.run() # Load or rebuild FAISS + embeddings self.agent.load_summarizer() # Load summarizer self.encoder = self.agent.load_encoder() # Load Dataset self.dataset = self.load_dataset() self.results = [] self.failed_queries = [] nltk.download('punkt', quiet=True) def load_dataset(self): path = self.paths["eval_dataset"] if not os.path.exists(path): raise FileNotFoundError(f"Dataset not found: {path}") with open(path, "r", encoding="utf-8") as f: return json.load(f) def measure_latency(self, func, *args, **kwargs): """Helper: measure time taken by a function call.""" start = time.time() result = func(*args, **kwargs) latency = time.time() - start return result, latency def evaluate_query(self, query, reference): """Run one query, compare answer vs. reference, compute metrics.""" try: # Run pipeline system_answer, latency = self.measure_latency(self.agent.answer_query, query) # 1. Embedding similarity (proxy retrieval quality) ref_emb = self.encoder.encode([reference], convert_to_numpy=True) ans_emb = self.encoder.encode([system_answer], convert_to_numpy=True) retrieval_quality = float(cosine_similarity(ref_emb, ans_emb)[0][0]) # 2. ROUGE-L scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True) rouge_score = scorer.score(reference, system_answer)['rougeL'].fmeasure # 3. BLEU (with smoothing for short texts) smoothie = SmoothingFunction().method4 bleu = sentence_bleu([reference.split()], system_answer.split(), smoothing_function=smoothie) # 4. BERTScore (semantic similarity) P, R, F1 = bert_score([system_answer], [reference], lang="en") bert_f1 = float(F1.mean()) metrics = { "query": query, "reference": reference, "system_answer": system_answer, "retrieval_quality": retrieval_quality, "rougeL": rouge_score, "bleu": bleu, "bertscore_f1": bert_f1, "latency_sec": latency } # Log into metrics logger self.metrics_logger.log_query_metrics(query, metrics) return metrics except Exception as e: self.logger.error(f"Error evaluating query '{query}': {e}") return None def aggregate_summary(self): """Aggregate metrics across all queries for global averages.""" if not self.results: return {} summary = { "avg_retrieval_quality": float(np.mean([r["retrieval_quality"] for r in self.results])), "avg_rougeL": float(np.mean([r["rougeL"] for r in self.results])), "avg_bleu": float(np.mean([r["bleu"] for r in self.results])), "avg_bertscore_f1": float(np.mean([r["bertscore_f1"] for r in self.results])), "avg_latency_sec": float(np.mean([r["latency_sec"] for r in self.results])), "num_queries": len(self.results) } return summary def run(self): """Run evaluation on entire dataset.""" self.logger.info("Starting Evaluation...") for item in tqdm(self.dataset, desc="Queries"): query = item["query"] reference = item["reference"] result = self.evaluate_query(query, reference) if result: self.results.append(result) # Save result with open(self.paths["eval_results"], "w", encoding="utf-8") as f: json.dump(self.results, f, indent=2) if self.failed_queries: with open(self.paths["failed_queries"], "w", encoding="utf-8") as f: json.dump(self.failed_queries, f, indent=2) # Save metrics summary summary = self.aggregate_summary() # NEW: aggregated averages self.logger.info(f"📊 Evaluation summary: {summary}") # Also save aggregated summary separately with open(self.paths.get("eval_summary", "eval_summary.json"), "w", encoding="utf-8") as f: json.dump(summary, f, indent=2) return self.results, summary if __name__ == "__main__": evaluator = Evaluator() results, summary = evaluator.run() print("\n=== Sample Results ===") print(json.dumps(results[:2], indent=2)) print("\n=== Summary ===") print(json.dumps(summary, indent=2))