|  |  | 
					
						
						|  |  | 
					
						
						|  | import time | 
					
						
						|  | from typing import Dict, Any, List, Optional | 
					
						
						|  | from dataclasses import dataclass, field | 
					
						
						|  | from datetime import datetime, timedelta | 
					
						
						|  | import json | 
					
						
						|  | from pathlib import Path | 
					
						
						|  |  | 
					
						
						|  | from ankigen_core.logging import logger | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @dataclass | 
					
						
						|  | class AgentExecution: | 
					
						
						|  | """Single agent execution record""" | 
					
						
						|  | agent_name: str | 
					
						
						|  | start_time: datetime | 
					
						
						|  | end_time: datetime | 
					
						
						|  | success: bool | 
					
						
						|  | input_tokens: Optional[int] = None | 
					
						
						|  | output_tokens: Optional[int] = None | 
					
						
						|  | cost: Optional[float] = None | 
					
						
						|  | error_message: Optional[str] = None | 
					
						
						|  | metadata: Dict[str, Any] = field(default_factory=dict) | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def duration(self) -> float: | 
					
						
						|  | """Execution duration in seconds""" | 
					
						
						|  | return (self.end_time - self.start_time).total_seconds() | 
					
						
						|  |  | 
					
						
						|  | def to_dict(self) -> Dict[str, Any]: | 
					
						
						|  | """Convert to dictionary for serialization""" | 
					
						
						|  | return { | 
					
						
						|  | "agent_name": self.agent_name, | 
					
						
						|  | "start_time": self.start_time.isoformat(), | 
					
						
						|  | "end_time": self.end_time.isoformat(), | 
					
						
						|  | "duration": self.duration, | 
					
						
						|  | "success": self.success, | 
					
						
						|  | "input_tokens": self.input_tokens, | 
					
						
						|  | "output_tokens": self.output_tokens, | 
					
						
						|  | "cost": self.cost, | 
					
						
						|  | "error_message": self.error_message, | 
					
						
						|  | "metadata": self.metadata | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @dataclass | 
					
						
						|  | class AgentStats: | 
					
						
						|  | """Aggregated statistics for an agent""" | 
					
						
						|  | agent_name: str | 
					
						
						|  | total_executions: int = 0 | 
					
						
						|  | successful_executions: int = 0 | 
					
						
						|  | total_duration: float = 0.0 | 
					
						
						|  | total_input_tokens: int = 0 | 
					
						
						|  | total_output_tokens: int = 0 | 
					
						
						|  | total_cost: float = 0.0 | 
					
						
						|  | error_count: int = 0 | 
					
						
						|  | last_execution: Optional[datetime] = None | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def success_rate(self) -> float: | 
					
						
						|  | """Success rate as percentage""" | 
					
						
						|  | if self.total_executions == 0: | 
					
						
						|  | return 0.0 | 
					
						
						|  | return (self.successful_executions / self.total_executions) * 100 | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def average_duration(self) -> float: | 
					
						
						|  | """Average execution duration in seconds""" | 
					
						
						|  | if self.total_executions == 0: | 
					
						
						|  | return 0.0 | 
					
						
						|  | return self.total_duration / self.total_executions | 
					
						
						|  |  | 
					
						
						|  | @property | 
					
						
						|  | def average_cost(self) -> float: | 
					
						
						|  | """Average cost per execution""" | 
					
						
						|  | if self.total_executions == 0: | 
					
						
						|  | return 0.0 | 
					
						
						|  | return self.total_cost / self.total_executions | 
					
						
						|  |  | 
					
						
						|  | def to_dict(self) -> Dict[str, Any]: | 
					
						
						|  | """Convert to dictionary for serialization""" | 
					
						
						|  | return { | 
					
						
						|  | "agent_name": self.agent_name, | 
					
						
						|  | "total_executions": self.total_executions, | 
					
						
						|  | "successful_executions": self.successful_executions, | 
					
						
						|  | "success_rate": self.success_rate, | 
					
						
						|  | "total_duration": self.total_duration, | 
					
						
						|  | "average_duration": self.average_duration, | 
					
						
						|  | "total_input_tokens": self.total_input_tokens, | 
					
						
						|  | "total_output_tokens": self.total_output_tokens, | 
					
						
						|  | "total_cost": self.total_cost, | 
					
						
						|  | "average_cost": self.average_cost, | 
					
						
						|  | "error_count": self.error_count, | 
					
						
						|  | "last_execution": self.last_execution.isoformat() if self.last_execution else None | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class AgentMetrics: | 
					
						
						|  | """Agent performance metrics collector and analyzer""" | 
					
						
						|  |  | 
					
						
						|  | def __init__(self, persistence_dir: Optional[str] = None): | 
					
						
						|  | self.persistence_dir = Path(persistence_dir) if persistence_dir else Path("metrics/agents") | 
					
						
						|  | self.persistence_dir.mkdir(parents=True, exist_ok=True) | 
					
						
						|  |  | 
					
						
						|  | self.executions: List[AgentExecution] = [] | 
					
						
						|  | self.agent_stats: Dict[str, AgentStats] = {} | 
					
						
						|  | self._load_persisted_metrics() | 
					
						
						|  |  | 
					
						
						|  | def record_execution( | 
					
						
						|  | self, | 
					
						
						|  | agent_name: str, | 
					
						
						|  | start_time: datetime, | 
					
						
						|  | end_time: datetime, | 
					
						
						|  | success: bool, | 
					
						
						|  | input_tokens: Optional[int] = None, | 
					
						
						|  | output_tokens: Optional[int] = None, | 
					
						
						|  | cost: Optional[float] = None, | 
					
						
						|  | error_message: Optional[str] = None, | 
					
						
						|  | metadata: Optional[Dict[str, Any]] = None | 
					
						
						|  | ): | 
					
						
						|  | """Record a single agent execution""" | 
					
						
						|  | execution = AgentExecution( | 
					
						
						|  | agent_name=agent_name, | 
					
						
						|  | start_time=start_time, | 
					
						
						|  | end_time=end_time, | 
					
						
						|  | success=success, | 
					
						
						|  | input_tokens=input_tokens, | 
					
						
						|  | output_tokens=output_tokens, | 
					
						
						|  | cost=cost, | 
					
						
						|  | error_message=error_message, | 
					
						
						|  | metadata=metadata or {} | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | self.executions.append(execution) | 
					
						
						|  | self._update_agent_stats(execution) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self._persist_execution(execution) | 
					
						
						|  |  | 
					
						
						|  | logger.debug(f"Recorded execution for {agent_name}: {execution.duration:.2f}s, success={success}") | 
					
						
						|  |  | 
					
						
						|  | def _update_agent_stats(self, execution: AgentExecution): | 
					
						
						|  | """Update aggregated statistics for an agent""" | 
					
						
						|  | agent_name = execution.agent_name | 
					
						
						|  |  | 
					
						
						|  | if agent_name not in self.agent_stats: | 
					
						
						|  | self.agent_stats[agent_name] = AgentStats(agent_name=agent_name) | 
					
						
						|  |  | 
					
						
						|  | stats = self.agent_stats[agent_name] | 
					
						
						|  | stats.total_executions += 1 | 
					
						
						|  | stats.total_duration += execution.duration | 
					
						
						|  | stats.last_execution = execution.end_time | 
					
						
						|  |  | 
					
						
						|  | if execution.success: | 
					
						
						|  | stats.successful_executions += 1 | 
					
						
						|  | else: | 
					
						
						|  | stats.error_count += 1 | 
					
						
						|  |  | 
					
						
						|  | if execution.input_tokens: | 
					
						
						|  | stats.total_input_tokens += execution.input_tokens | 
					
						
						|  |  | 
					
						
						|  | if execution.output_tokens: | 
					
						
						|  | stats.total_output_tokens += execution.output_tokens | 
					
						
						|  |  | 
					
						
						|  | if execution.cost: | 
					
						
						|  | stats.total_cost += execution.cost | 
					
						
						|  |  | 
					
						
						|  | def get_agent_stats(self, agent_name: str) -> Optional[AgentStats]: | 
					
						
						|  | """Get statistics for a specific agent""" | 
					
						
						|  | return self.agent_stats.get(agent_name) | 
					
						
						|  |  | 
					
						
						|  | def get_all_agent_stats(self) -> Dict[str, AgentStats]: | 
					
						
						|  | """Get statistics for all agents""" | 
					
						
						|  | return self.agent_stats.copy() | 
					
						
						|  |  | 
					
						
						|  | def get_executions( | 
					
						
						|  | self, | 
					
						
						|  | agent_name: Optional[str] = None, | 
					
						
						|  | start_time: Optional[datetime] = None, | 
					
						
						|  | end_time: Optional[datetime] = None, | 
					
						
						|  | success_only: Optional[bool] = None | 
					
						
						|  | ) -> List[AgentExecution]: | 
					
						
						|  | """Get filtered execution records""" | 
					
						
						|  | filtered = self.executions | 
					
						
						|  |  | 
					
						
						|  | if agent_name: | 
					
						
						|  | filtered = [e for e in filtered if e.agent_name == agent_name] | 
					
						
						|  |  | 
					
						
						|  | if start_time: | 
					
						
						|  | filtered = [e for e in filtered if e.start_time >= start_time] | 
					
						
						|  |  | 
					
						
						|  | if end_time: | 
					
						
						|  | filtered = [e for e in filtered if e.end_time <= end_time] | 
					
						
						|  |  | 
					
						
						|  | if success_only is not None: | 
					
						
						|  | filtered = [e for e in filtered if e.success == success_only] | 
					
						
						|  |  | 
					
						
						|  | return filtered | 
					
						
						|  |  | 
					
						
						|  | def get_performance_report(self, hours: int = 24) -> Dict[str, Any]: | 
					
						
						|  | """Generate a performance report for the last N hours""" | 
					
						
						|  | cutoff_time = datetime.now() - timedelta(hours=hours) | 
					
						
						|  | recent_executions = self.get_executions(start_time=cutoff_time) | 
					
						
						|  |  | 
					
						
						|  | if not recent_executions: | 
					
						
						|  | return { | 
					
						
						|  | "period": f"Last {hours} hours", | 
					
						
						|  | "total_executions": 0, | 
					
						
						|  | "agents": {} | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | agent_executions = {} | 
					
						
						|  | for execution in recent_executions: | 
					
						
						|  | if execution.agent_name not in agent_executions: | 
					
						
						|  | agent_executions[execution.agent_name] = [] | 
					
						
						|  | agent_executions[execution.agent_name].append(execution) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | agent_reports = {} | 
					
						
						|  | total_executions = 0 | 
					
						
						|  | total_successful = 0 | 
					
						
						|  | total_duration = 0.0 | 
					
						
						|  | total_cost = 0.0 | 
					
						
						|  |  | 
					
						
						|  | for agent_name, executions in agent_executions.items(): | 
					
						
						|  | successful = len([e for e in executions if e.success]) | 
					
						
						|  | total_dur = sum(e.duration for e in executions) | 
					
						
						|  | total_cost_agent = sum(e.cost or 0 for e in executions) | 
					
						
						|  |  | 
					
						
						|  | agent_reports[agent_name] = { | 
					
						
						|  | "executions": len(executions), | 
					
						
						|  | "successful": successful, | 
					
						
						|  | "success_rate": (successful / len(executions)) * 100, | 
					
						
						|  | "average_duration": total_dur / len(executions), | 
					
						
						|  | "total_cost": total_cost_agent, | 
					
						
						|  | "average_cost": total_cost_agent / len(executions) if total_cost_agent > 0 else 0 | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | total_executions += len(executions) | 
					
						
						|  | total_successful += successful | 
					
						
						|  | total_duration += total_dur | 
					
						
						|  | total_cost += total_cost_agent | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "period": f"Last {hours} hours", | 
					
						
						|  | "total_executions": total_executions, | 
					
						
						|  | "total_successful": total_successful, | 
					
						
						|  | "overall_success_rate": (total_successful / total_executions) * 100 if total_executions > 0 else 0, | 
					
						
						|  | "total_duration": total_duration, | 
					
						
						|  | "average_duration": total_duration / total_executions if total_executions > 0 else 0, | 
					
						
						|  | "total_cost": total_cost, | 
					
						
						|  | "average_cost": total_cost / total_executions if total_cost > 0 and total_executions > 0 else 0, | 
					
						
						|  | "agents": agent_reports | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | def get_quality_metrics(self) -> Dict[str, Any]: | 
					
						
						|  | """Get quality-focused metrics for card generation""" | 
					
						
						|  |  | 
					
						
						|  | judge_executions = [ | 
					
						
						|  | e for e in self.executions | 
					
						
						|  | if "judge" in e.agent_name.lower() and e.success | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | if not judge_executions: | 
					
						
						|  | return {"message": "No judge data available"} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | total_cards_judged = 0 | 
					
						
						|  | total_accepted = 0 | 
					
						
						|  | total_rejected = 0 | 
					
						
						|  | total_needs_revision = 0 | 
					
						
						|  |  | 
					
						
						|  | judge_stats = {} | 
					
						
						|  |  | 
					
						
						|  | for execution in judge_executions: | 
					
						
						|  | metadata = execution.metadata | 
					
						
						|  | agent_name = execution.agent_name | 
					
						
						|  |  | 
					
						
						|  | if agent_name not in judge_stats: | 
					
						
						|  | judge_stats[agent_name] = { | 
					
						
						|  | "total_cards": 0, | 
					
						
						|  | "accepted": 0, | 
					
						
						|  | "rejected": 0, | 
					
						
						|  | "needs_revision": 0 | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | cards_judged = metadata.get("cards_judged", 1) | 
					
						
						|  | accepted = metadata.get("accepted", 0) | 
					
						
						|  | rejected = metadata.get("rejected", 0) | 
					
						
						|  | needs_revision = metadata.get("needs_revision", 0) | 
					
						
						|  |  | 
					
						
						|  | judge_stats[agent_name]["total_cards"] += cards_judged | 
					
						
						|  | judge_stats[agent_name]["accepted"] += accepted | 
					
						
						|  | judge_stats[agent_name]["rejected"] += rejected | 
					
						
						|  | judge_stats[agent_name]["needs_revision"] += needs_revision | 
					
						
						|  |  | 
					
						
						|  | total_cards_judged += cards_judged | 
					
						
						|  | total_accepted += accepted | 
					
						
						|  | total_rejected += rejected | 
					
						
						|  | total_needs_revision += needs_revision | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | acceptance_rate = (total_accepted / total_cards_judged) * 100 if total_cards_judged > 0 else 0 | 
					
						
						|  | rejection_rate = (total_rejected / total_cards_judged) * 100 if total_cards_judged > 0 else 0 | 
					
						
						|  | revision_rate = (total_needs_revision / total_cards_judged) * 100 if total_cards_judged > 0 else 0 | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "total_cards_judged": total_cards_judged, | 
					
						
						|  | "acceptance_rate": acceptance_rate, | 
					
						
						|  | "rejection_rate": rejection_rate, | 
					
						
						|  | "revision_rate": revision_rate, | 
					
						
						|  | "judge_breakdown": judge_stats | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | def _persist_execution(self, execution: AgentExecution): | 
					
						
						|  | """Persist a single execution to disk""" | 
					
						
						|  | try: | 
					
						
						|  | today = execution.start_time.strftime("%Y-%m-%d") | 
					
						
						|  | file_path = self.persistence_dir / f"executions_{today}.jsonl" | 
					
						
						|  |  | 
					
						
						|  | with open(file_path, 'a') as f: | 
					
						
						|  | f.write(json.dumps(execution.to_dict()) + '\n') | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Failed to persist execution: {e}") | 
					
						
						|  |  | 
					
						
						|  | def _load_persisted_metrics(self): | 
					
						
						|  | """Load persisted metrics from disk""" | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | for i in range(7): | 
					
						
						|  | date = datetime.now() - timedelta(days=i) | 
					
						
						|  | date_str = date.strftime("%Y-%m-%d") | 
					
						
						|  | file_path = self.persistence_dir / f"executions_{date_str}.jsonl" | 
					
						
						|  |  | 
					
						
						|  | if file_path.exists(): | 
					
						
						|  | with open(file_path, 'r') as f: | 
					
						
						|  | for line in f: | 
					
						
						|  | try: | 
					
						
						|  | data = json.loads(line.strip()) | 
					
						
						|  | execution = AgentExecution( | 
					
						
						|  | agent_name=data["agent_name"], | 
					
						
						|  | start_time=datetime.fromisoformat(data["start_time"]), | 
					
						
						|  | end_time=datetime.fromisoformat(data["end_time"]), | 
					
						
						|  | success=data["success"], | 
					
						
						|  | input_tokens=data.get("input_tokens"), | 
					
						
						|  | output_tokens=data.get("output_tokens"), | 
					
						
						|  | cost=data.get("cost"), | 
					
						
						|  | error_message=data.get("error_message"), | 
					
						
						|  | metadata=data.get("metadata", {}) | 
					
						
						|  | ) | 
					
						
						|  | self.executions.append(execution) | 
					
						
						|  | self._update_agent_stats(execution) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.warning(f"Failed to parse execution record: {e}") | 
					
						
						|  |  | 
					
						
						|  | logger.info(f"Loaded {len(self.executions)} persisted execution records") | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Failed to load persisted metrics: {e}") | 
					
						
						|  |  | 
					
						
						|  | def cleanup_old_data(self, days: int = 30): | 
					
						
						|  | """Clean up execution data older than specified days""" | 
					
						
						|  | cutoff_time = datetime.now() - timedelta(days=days) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self.executions = [e for e in self.executions if e.start_time >= cutoff_time] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self.agent_stats.clear() | 
					
						
						|  | for execution in self.executions: | 
					
						
						|  | self._update_agent_stats(execution) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | for file_path in self.persistence_dir.glob("executions_*.jsonl"): | 
					
						
						|  | try: | 
					
						
						|  | date_str = file_path.stem.split("_")[1] | 
					
						
						|  | file_date = datetime.strptime(date_str, "%Y-%m-%d") | 
					
						
						|  | if file_date < cutoff_time: | 
					
						
						|  | file_path.unlink() | 
					
						
						|  | logger.info(f"Removed old metrics file: {file_path}") | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.warning(f"Failed to process metrics file {file_path}: {e}") | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Failed to cleanup old metrics data: {e}") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | _global_metrics: Optional[AgentMetrics] = None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_metrics() -> AgentMetrics: | 
					
						
						|  | """Get the global agent metrics instance""" | 
					
						
						|  | global _global_metrics | 
					
						
						|  | if _global_metrics is None: | 
					
						
						|  | _global_metrics = AgentMetrics() | 
					
						
						|  | return _global_metrics | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def record_agent_execution( | 
					
						
						|  | agent_name: str, | 
					
						
						|  | start_time: datetime, | 
					
						
						|  | end_time: datetime, | 
					
						
						|  | success: bool, | 
					
						
						|  | **kwargs | 
					
						
						|  | ): | 
					
						
						|  | """Convenience function to record an agent execution""" | 
					
						
						|  | metrics = get_metrics() | 
					
						
						|  | metrics.record_execution( | 
					
						
						|  | agent_name=agent_name, | 
					
						
						|  | start_time=start_time, | 
					
						
						|  | end_time=end_time, | 
					
						
						|  | success=success, | 
					
						
						|  | **kwargs | 
					
						
						|  | ) |