financial-qa-agent / evaluation.py
codewithpurav's picture
Add Dockerfile for Streamlit deployment
3efe7a4
raw
history blame
5.75 kB
import os
import json
import time
import numpy as np
from tqdm import tqdm
import nltk
from Logger import GetLogger, MetricsLogger
from Embeddings import GetEmbeddings
# Metrics
from sklearn.metrics.pairwise import cosine_similarity
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from bert_score import score as bert_score
class Evaluator:
"""
Evaluation pipeline for financial QA Agent.
Uses eval_dataset.json to run queries, collect answers, and compute metrics.
"""
def __init__(self, config_path="config.json", logger=None):
with open(config_path, "r") as f:
self.config = json.load(f)
self.paths = self.config["paths"]
if not logger:
obj = GetLogger()
logger = obj.get_logger()
self.logger = logger
# Metrics logger
self.metrics_logger = MetricsLogger(logger=self.logger)
# Initialize Agent
self.agent = GetEmbeddings(config_path=config_path, logger=self.logger)
self.agent.run() # Load or rebuild FAISS + embeddings
self.agent.load_summarizer() # Load summarizer
self.encoder = self.agent.load_encoder()
# Load Dataset
self.dataset = self.load_dataset()
self.results = []
self.failed_queries = []
nltk.download('punkt', quiet=True)
def load_dataset(self):
path = self.paths["eval_dataset"]
if not os.path.exists(path):
raise FileNotFoundError(f"Dataset not found: {path}")
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def measure_latency(self, func, *args, **kwargs):
"""Helper: measure time taken by a function call."""
start = time.time()
result = func(*args, **kwargs)
latency = time.time() - start
return result, latency
def evaluate_query(self, query, reference):
"""Run one query, compare answer vs. reference, compute metrics."""
try:
# Run pipeline
system_answer, latency = self.measure_latency(self.agent.answer_query, query)
# 1. Embedding similarity (proxy retrieval quality)
ref_emb = self.encoder.encode([reference], convert_to_numpy=True)
ans_emb = self.encoder.encode([system_answer], convert_to_numpy=True)
retrieval_quality = float(cosine_similarity(ref_emb, ans_emb)[0][0])
# 2. ROUGE-L
scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
rouge_score = scorer.score(reference, system_answer)['rougeL'].fmeasure
# 3. BLEU (with smoothing for short texts)
smoothie = SmoothingFunction().method4
bleu = sentence_bleu([reference.split()], system_answer.split(), smoothing_function=smoothie)
# 4. BERTScore (semantic similarity)
P, R, F1 = bert_score([system_answer], [reference], lang="en")
bert_f1 = float(F1.mean())
metrics = {
"query": query,
"reference": reference,
"system_answer": system_answer,
"retrieval_quality": retrieval_quality,
"rougeL": rouge_score,
"bleu": bleu,
"bertscore_f1": bert_f1,
"latency_sec": latency
}
# Log into metrics logger
self.metrics_logger.log_query_metrics(query, metrics)
return metrics
except Exception as e:
self.logger.error(f"Error evaluating query '{query}': {e}")
return None
def aggregate_summary(self):
"""Aggregate metrics across all queries for global averages."""
if not self.results:
return {}
summary = {
"avg_retrieval_quality": float(np.mean([r["retrieval_quality"] for r in self.results])),
"avg_rougeL": float(np.mean([r["rougeL"] for r in self.results])),
"avg_bleu": float(np.mean([r["bleu"] for r in self.results])),
"avg_bertscore_f1": float(np.mean([r["bertscore_f1"] for r in self.results])),
"avg_latency_sec": float(np.mean([r["latency_sec"] for r in self.results])),
"num_queries": len(self.results)
}
return summary
def run(self):
"""Run evaluation on entire dataset."""
self.logger.info("Starting Evaluation...")
for item in tqdm(self.dataset, desc="Queries"):
query = item["query"]
reference = item["reference"]
result = self.evaluate_query(query, reference)
if result:
self.results.append(result)
# Save result
with open(self.paths["eval_results"], "w", encoding="utf-8") as f:
json.dump(self.results, f, indent=2)
if self.failed_queries:
with open(self.paths["failed_queries"], "w", encoding="utf-8") as f:
json.dump(self.failed_queries, f, indent=2)
# Save metrics summary
summary = self.aggregate_summary() # NEW: aggregated averages
self.logger.info(f"πŸ“Š Evaluation summary: {summary}")
# Also save aggregated summary separately
with open(self.paths.get("eval_summary", "eval_summary.json"), "w", encoding="utf-8") as f:
json.dump(summary, f, indent=2)
return self.results, summary
if __name__ == "__main__":
evaluator = Evaluator()
results, summary = evaluator.run()
print("\n=== Sample Results ===")
print(json.dumps(results[:2], indent=2))
print("\n=== Summary ===")
print(json.dumps(summary, indent=2))