|
|
|
|
|
import time |
|
from typing import Dict, Any, List, Optional |
|
from dataclasses import dataclass, field |
|
from datetime import datetime, timedelta |
|
import json |
|
from pathlib import Path |
|
|
|
from ankigen_core.logging import logger |
|
|
|
|
|
@dataclass |
|
class AgentExecution: |
|
"""Single agent execution record""" |
|
agent_name: str |
|
start_time: datetime |
|
end_time: datetime |
|
success: bool |
|
input_tokens: Optional[int] = None |
|
output_tokens: Optional[int] = None |
|
cost: Optional[float] = None |
|
error_message: Optional[str] = None |
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
@property |
|
def duration(self) -> float: |
|
"""Execution duration in seconds""" |
|
return (self.end_time - self.start_time).total_seconds() |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert to dictionary for serialization""" |
|
return { |
|
"agent_name": self.agent_name, |
|
"start_time": self.start_time.isoformat(), |
|
"end_time": self.end_time.isoformat(), |
|
"duration": self.duration, |
|
"success": self.success, |
|
"input_tokens": self.input_tokens, |
|
"output_tokens": self.output_tokens, |
|
"cost": self.cost, |
|
"error_message": self.error_message, |
|
"metadata": self.metadata |
|
} |
|
|
|
|
|
@dataclass |
|
class AgentStats: |
|
"""Aggregated statistics for an agent""" |
|
agent_name: str |
|
total_executions: int = 0 |
|
successful_executions: int = 0 |
|
total_duration: float = 0.0 |
|
total_input_tokens: int = 0 |
|
total_output_tokens: int = 0 |
|
total_cost: float = 0.0 |
|
error_count: int = 0 |
|
last_execution: Optional[datetime] = None |
|
|
|
@property |
|
def success_rate(self) -> float: |
|
"""Success rate as percentage""" |
|
if self.total_executions == 0: |
|
return 0.0 |
|
return (self.successful_executions / self.total_executions) * 100 |
|
|
|
@property |
|
def average_duration(self) -> float: |
|
"""Average execution duration in seconds""" |
|
if self.total_executions == 0: |
|
return 0.0 |
|
return self.total_duration / self.total_executions |
|
|
|
@property |
|
def average_cost(self) -> float: |
|
"""Average cost per execution""" |
|
if self.total_executions == 0: |
|
return 0.0 |
|
return self.total_cost / self.total_executions |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert to dictionary for serialization""" |
|
return { |
|
"agent_name": self.agent_name, |
|
"total_executions": self.total_executions, |
|
"successful_executions": self.successful_executions, |
|
"success_rate": self.success_rate, |
|
"total_duration": self.total_duration, |
|
"average_duration": self.average_duration, |
|
"total_input_tokens": self.total_input_tokens, |
|
"total_output_tokens": self.total_output_tokens, |
|
"total_cost": self.total_cost, |
|
"average_cost": self.average_cost, |
|
"error_count": self.error_count, |
|
"last_execution": self.last_execution.isoformat() if self.last_execution else None |
|
} |
|
|
|
|
|
class AgentMetrics: |
|
"""Agent performance metrics collector and analyzer""" |
|
|
|
def __init__(self, persistence_dir: Optional[str] = None): |
|
self.persistence_dir = Path(persistence_dir) if persistence_dir else Path("metrics/agents") |
|
self.persistence_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
self.executions: List[AgentExecution] = [] |
|
self.agent_stats: Dict[str, AgentStats] = {} |
|
self._load_persisted_metrics() |
|
|
|
def record_execution( |
|
self, |
|
agent_name: str, |
|
start_time: datetime, |
|
end_time: datetime, |
|
success: bool, |
|
input_tokens: Optional[int] = None, |
|
output_tokens: Optional[int] = None, |
|
cost: Optional[float] = None, |
|
error_message: Optional[str] = None, |
|
metadata: Optional[Dict[str, Any]] = None |
|
): |
|
"""Record a single agent execution""" |
|
execution = AgentExecution( |
|
agent_name=agent_name, |
|
start_time=start_time, |
|
end_time=end_time, |
|
success=success, |
|
input_tokens=input_tokens, |
|
output_tokens=output_tokens, |
|
cost=cost, |
|
error_message=error_message, |
|
metadata=metadata or {} |
|
) |
|
|
|
self.executions.append(execution) |
|
self._update_agent_stats(execution) |
|
|
|
|
|
self._persist_execution(execution) |
|
|
|
logger.debug(f"Recorded execution for {agent_name}: {execution.duration:.2f}s, success={success}") |
|
|
|
def _update_agent_stats(self, execution: AgentExecution): |
|
"""Update aggregated statistics for an agent""" |
|
agent_name = execution.agent_name |
|
|
|
if agent_name not in self.agent_stats: |
|
self.agent_stats[agent_name] = AgentStats(agent_name=agent_name) |
|
|
|
stats = self.agent_stats[agent_name] |
|
stats.total_executions += 1 |
|
stats.total_duration += execution.duration |
|
stats.last_execution = execution.end_time |
|
|
|
if execution.success: |
|
stats.successful_executions += 1 |
|
else: |
|
stats.error_count += 1 |
|
|
|
if execution.input_tokens: |
|
stats.total_input_tokens += execution.input_tokens |
|
|
|
if execution.output_tokens: |
|
stats.total_output_tokens += execution.output_tokens |
|
|
|
if execution.cost: |
|
stats.total_cost += execution.cost |
|
|
|
def get_agent_stats(self, agent_name: str) -> Optional[AgentStats]: |
|
"""Get statistics for a specific agent""" |
|
return self.agent_stats.get(agent_name) |
|
|
|
def get_all_agent_stats(self) -> Dict[str, AgentStats]: |
|
"""Get statistics for all agents""" |
|
return self.agent_stats.copy() |
|
|
|
def get_executions( |
|
self, |
|
agent_name: Optional[str] = None, |
|
start_time: Optional[datetime] = None, |
|
end_time: Optional[datetime] = None, |
|
success_only: Optional[bool] = None |
|
) -> List[AgentExecution]: |
|
"""Get filtered execution records""" |
|
filtered = self.executions |
|
|
|
if agent_name: |
|
filtered = [e for e in filtered if e.agent_name == agent_name] |
|
|
|
if start_time: |
|
filtered = [e for e in filtered if e.start_time >= start_time] |
|
|
|
if end_time: |
|
filtered = [e for e in filtered if e.end_time <= end_time] |
|
|
|
if success_only is not None: |
|
filtered = [e for e in filtered if e.success == success_only] |
|
|
|
return filtered |
|
|
|
def get_performance_report(self, hours: int = 24) -> Dict[str, Any]: |
|
"""Generate a performance report for the last N hours""" |
|
cutoff_time = datetime.now() - timedelta(hours=hours) |
|
recent_executions = self.get_executions(start_time=cutoff_time) |
|
|
|
if not recent_executions: |
|
return { |
|
"period": f"Last {hours} hours", |
|
"total_executions": 0, |
|
"agents": {} |
|
} |
|
|
|
|
|
agent_executions = {} |
|
for execution in recent_executions: |
|
if execution.agent_name not in agent_executions: |
|
agent_executions[execution.agent_name] = [] |
|
agent_executions[execution.agent_name].append(execution) |
|
|
|
|
|
agent_reports = {} |
|
total_executions = 0 |
|
total_successful = 0 |
|
total_duration = 0.0 |
|
total_cost = 0.0 |
|
|
|
for agent_name, executions in agent_executions.items(): |
|
successful = len([e for e in executions if e.success]) |
|
total_dur = sum(e.duration for e in executions) |
|
total_cost_agent = sum(e.cost or 0 for e in executions) |
|
|
|
agent_reports[agent_name] = { |
|
"executions": len(executions), |
|
"successful": successful, |
|
"success_rate": (successful / len(executions)) * 100, |
|
"average_duration": total_dur / len(executions), |
|
"total_cost": total_cost_agent, |
|
"average_cost": total_cost_agent / len(executions) if total_cost_agent > 0 else 0 |
|
} |
|
|
|
total_executions += len(executions) |
|
total_successful += successful |
|
total_duration += total_dur |
|
total_cost += total_cost_agent |
|
|
|
return { |
|
"period": f"Last {hours} hours", |
|
"total_executions": total_executions, |
|
"total_successful": total_successful, |
|
"overall_success_rate": (total_successful / total_executions) * 100 if total_executions > 0 else 0, |
|
"total_duration": total_duration, |
|
"average_duration": total_duration / total_executions if total_executions > 0 else 0, |
|
"total_cost": total_cost, |
|
"average_cost": total_cost / total_executions if total_cost > 0 and total_executions > 0 else 0, |
|
"agents": agent_reports |
|
} |
|
|
|
def get_quality_metrics(self) -> Dict[str, Any]: |
|
"""Get quality-focused metrics for card generation""" |
|
|
|
judge_executions = [ |
|
e for e in self.executions |
|
if "judge" in e.agent_name.lower() and e.success |
|
] |
|
|
|
if not judge_executions: |
|
return {"message": "No judge data available"} |
|
|
|
|
|
total_cards_judged = 0 |
|
total_accepted = 0 |
|
total_rejected = 0 |
|
total_needs_revision = 0 |
|
|
|
judge_stats = {} |
|
|
|
for execution in judge_executions: |
|
metadata = execution.metadata |
|
agent_name = execution.agent_name |
|
|
|
if agent_name not in judge_stats: |
|
judge_stats[agent_name] = { |
|
"total_cards": 0, |
|
"accepted": 0, |
|
"rejected": 0, |
|
"needs_revision": 0 |
|
} |
|
|
|
|
|
cards_judged = metadata.get("cards_judged", 1) |
|
accepted = metadata.get("accepted", 0) |
|
rejected = metadata.get("rejected", 0) |
|
needs_revision = metadata.get("needs_revision", 0) |
|
|
|
judge_stats[agent_name]["total_cards"] += cards_judged |
|
judge_stats[agent_name]["accepted"] += accepted |
|
judge_stats[agent_name]["rejected"] += rejected |
|
judge_stats[agent_name]["needs_revision"] += needs_revision |
|
|
|
total_cards_judged += cards_judged |
|
total_accepted += accepted |
|
total_rejected += rejected |
|
total_needs_revision += needs_revision |
|
|
|
|
|
acceptance_rate = (total_accepted / total_cards_judged) * 100 if total_cards_judged > 0 else 0 |
|
rejection_rate = (total_rejected / total_cards_judged) * 100 if total_cards_judged > 0 else 0 |
|
revision_rate = (total_needs_revision / total_cards_judged) * 100 if total_cards_judged > 0 else 0 |
|
|
|
return { |
|
"total_cards_judged": total_cards_judged, |
|
"acceptance_rate": acceptance_rate, |
|
"rejection_rate": rejection_rate, |
|
"revision_rate": revision_rate, |
|
"judge_breakdown": judge_stats |
|
} |
|
|
|
def _persist_execution(self, execution: AgentExecution): |
|
"""Persist a single execution to disk""" |
|
try: |
|
today = execution.start_time.strftime("%Y-%m-%d") |
|
file_path = self.persistence_dir / f"executions_{today}.jsonl" |
|
|
|
with open(file_path, 'a') as f: |
|
f.write(json.dumps(execution.to_dict()) + '\n') |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to persist execution: {e}") |
|
|
|
def _load_persisted_metrics(self): |
|
"""Load persisted metrics from disk""" |
|
try: |
|
|
|
for i in range(7): |
|
date = datetime.now() - timedelta(days=i) |
|
date_str = date.strftime("%Y-%m-%d") |
|
file_path = self.persistence_dir / f"executions_{date_str}.jsonl" |
|
|
|
if file_path.exists(): |
|
with open(file_path, 'r') as f: |
|
for line in f: |
|
try: |
|
data = json.loads(line.strip()) |
|
execution = AgentExecution( |
|
agent_name=data["agent_name"], |
|
start_time=datetime.fromisoformat(data["start_time"]), |
|
end_time=datetime.fromisoformat(data["end_time"]), |
|
success=data["success"], |
|
input_tokens=data.get("input_tokens"), |
|
output_tokens=data.get("output_tokens"), |
|
cost=data.get("cost"), |
|
error_message=data.get("error_message"), |
|
metadata=data.get("metadata", {}) |
|
) |
|
self.executions.append(execution) |
|
self._update_agent_stats(execution) |
|
except Exception as e: |
|
logger.warning(f"Failed to parse execution record: {e}") |
|
|
|
logger.info(f"Loaded {len(self.executions)} persisted execution records") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to load persisted metrics: {e}") |
|
|
|
def cleanup_old_data(self, days: int = 30): |
|
"""Clean up execution data older than specified days""" |
|
cutoff_time = datetime.now() - timedelta(days=days) |
|
|
|
|
|
self.executions = [e for e in self.executions if e.start_time >= cutoff_time] |
|
|
|
|
|
self.agent_stats.clear() |
|
for execution in self.executions: |
|
self._update_agent_stats(execution) |
|
|
|
|
|
try: |
|
for file_path in self.persistence_dir.glob("executions_*.jsonl"): |
|
try: |
|
date_str = file_path.stem.split("_")[1] |
|
file_date = datetime.strptime(date_str, "%Y-%m-%d") |
|
if file_date < cutoff_time: |
|
file_path.unlink() |
|
logger.info(f"Removed old metrics file: {file_path}") |
|
except Exception as e: |
|
logger.warning(f"Failed to process metrics file {file_path}: {e}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to cleanup old metrics data: {e}") |
|
|
|
|
|
|
|
_global_metrics: Optional[AgentMetrics] = None |
|
|
|
|
|
def get_metrics() -> AgentMetrics: |
|
"""Get the global agent metrics instance""" |
|
global _global_metrics |
|
if _global_metrics is None: |
|
_global_metrics = AgentMetrics() |
|
return _global_metrics |
|
|
|
|
|
def record_agent_execution( |
|
agent_name: str, |
|
start_time: datetime, |
|
end_time: datetime, |
|
success: bool, |
|
**kwargs |
|
): |
|
"""Convenience function to record an agent execution""" |
|
metrics = get_metrics() |
|
metrics.record_execution( |
|
agent_name=agent_name, |
|
start_time=start_time, |
|
end_time=end_time, |
|
success=success, |
|
**kwargs |
|
) |