|
|
|
|
|
from typing import List, Dict, Any, Tuple |
|
from datetime import datetime |
|
|
|
|
|
from ankigen_core.logging import logger |
|
from ankigen_core.models import Card |
|
from ankigen_core.llm_interface import OpenAIClientManager |
|
|
|
from .generators import GenerationCoordinator, SubjectExpertAgent |
|
from .judges import JudgeCoordinator |
|
from .enhancers import RevisionAgent, EnhancementAgent |
|
|
|
|
|
class AgentOrchestrator: |
|
"""Main orchestrator for the AnkiGen agent system""" |
|
|
|
def __init__(self, client_manager: OpenAIClientManager): |
|
self.client_manager = client_manager |
|
self.openai_client = None |
|
|
|
|
|
self.generation_coordinator = None |
|
self.judge_coordinator = None |
|
self.revision_agent = None |
|
self.enhancement_agent = None |
|
|
|
|
|
self.all_agents_enabled = True |
|
|
|
async def initialize(self, api_key: str, model_overrides: Dict[str, str] = None): |
|
"""Initialize the agent system""" |
|
try: |
|
|
|
await self.client_manager.initialize_client(api_key) |
|
self.openai_client = self.client_manager.get_client() |
|
|
|
|
|
if model_overrides: |
|
from ankigen_core.agents.config import get_config_manager |
|
|
|
config_manager = get_config_manager() |
|
config_manager.update_models(model_overrides) |
|
logger.info(f"Applied model overrides: {model_overrides}") |
|
|
|
|
|
self.generation_coordinator = GenerationCoordinator(self.openai_client) |
|
self.judge_coordinator = JudgeCoordinator(self.openai_client) |
|
self.revision_agent = RevisionAgent(self.openai_client) |
|
self.enhancement_agent = EnhancementAgent(self.openai_client) |
|
|
|
logger.info("Agent system initialized successfully") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to initialize agent system: {e}") |
|
raise |
|
|
|
async def generate_cards_with_agents( |
|
self, |
|
topic: str, |
|
subject: str = "general", |
|
num_cards: int = 5, |
|
difficulty: str = "intermediate", |
|
enable_quality_pipeline: bool = True, |
|
context: Dict[str, Any] = None, |
|
) -> Tuple[List[Card], Dict[str, Any]]: |
|
"""Generate cards using the agent system""" |
|
start_time = datetime.now() |
|
|
|
try: |
|
|
|
|
|
if not self.openai_client: |
|
raise ValueError("Agent system not initialized") |
|
|
|
logger.info(f"Starting agent-based card generation: {topic} ({subject})") |
|
|
|
|
|
cards = await self._generation_phase( |
|
topic=topic, |
|
subject=subject, |
|
num_cards=num_cards, |
|
difficulty=difficulty, |
|
context=context, |
|
) |
|
|
|
|
|
quality_results = {} |
|
if enable_quality_pipeline and self.judge_coordinator: |
|
cards, quality_results = await self._quality_phase(cards) |
|
|
|
|
|
if self.enhancement_agent: |
|
cards = await self._enhancement_phase(cards) |
|
|
|
|
|
metadata = { |
|
"generation_method": "agent_system", |
|
"generation_time": (datetime.now() - start_time).total_seconds(), |
|
"cards_generated": len(cards), |
|
"quality_results": quality_results, |
|
"topic": topic, |
|
"subject": subject, |
|
"difficulty": difficulty, |
|
} |
|
|
|
logger.info( |
|
f"Agent-based generation complete: {len(cards)} cards generated" |
|
) |
|
return cards, metadata |
|
|
|
except Exception as e: |
|
logger.error(f"Agent-based generation failed: {e}") |
|
raise |
|
|
|
async def _generation_phase( |
|
self, |
|
topic: str, |
|
subject: str, |
|
num_cards: int, |
|
difficulty: str, |
|
context: Dict[str, Any] = None, |
|
) -> List[Card]: |
|
"""Execute the card generation phase""" |
|
|
|
if self.generation_coordinator: |
|
|
|
cards = await self.generation_coordinator.coordinate_generation( |
|
topic=topic, |
|
subject=subject, |
|
num_cards=num_cards, |
|
difficulty=difficulty, |
|
enable_review=True, |
|
enable_structuring=True, |
|
context=context, |
|
) |
|
else: |
|
|
|
subject_expert = SubjectExpertAgent(self.openai_client, subject) |
|
cards = await subject_expert.generate_cards( |
|
topic=topic, num_cards=num_cards, difficulty=difficulty, context=context |
|
) |
|
|
|
logger.info(f"Generation phase complete: {len(cards)} cards generated") |
|
return cards |
|
|
|
async def _quality_phase( |
|
self, cards: List[Card] |
|
) -> Tuple[List[Card], Dict[str, Any]]: |
|
"""Execute the quality assessment and improvement phase""" |
|
|
|
if not self.judge_coordinator: |
|
return cards, {"message": "Judge coordinator not available"} |
|
|
|
logger.info(f"Starting quality assessment for {len(cards)} cards") |
|
|
|
|
|
judge_results = await self.judge_coordinator.coordinate_judgment( |
|
cards=cards, |
|
enable_parallel=True, |
|
min_consensus=0.6, |
|
) |
|
|
|
|
|
approved_cards = [] |
|
rejected_cards = [] |
|
|
|
for card, decisions, approved in judge_results: |
|
if approved: |
|
approved_cards.append(card) |
|
else: |
|
rejected_cards.append((card, decisions)) |
|
|
|
|
|
revised_cards = [] |
|
if self.revision_agent and rejected_cards: |
|
logger.info(f"Attempting to revise {len(rejected_cards)} rejected cards") |
|
|
|
for card, decisions in rejected_cards: |
|
try: |
|
revised_card = await self.revision_agent.revise_card( |
|
card=card, |
|
judge_decisions=decisions, |
|
max_iterations=2, |
|
) |
|
|
|
|
|
revision_results = await self.judge_coordinator.coordinate_judgment( |
|
cards=[revised_card], |
|
enable_parallel=False, |
|
min_consensus=0.6, |
|
) |
|
|
|
if revision_results and revision_results[0][2]: |
|
revised_cards.append(revised_card) |
|
else: |
|
logger.warning( |
|
f"Revised card still rejected: {card.front.question[:50]}..." |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to revise card: {e}") |
|
|
|
|
|
final_cards = approved_cards + revised_cards |
|
|
|
|
|
quality_results = { |
|
"total_cards_judged": len(cards), |
|
"initially_approved": len(approved_cards), |
|
"initially_rejected": len(rejected_cards), |
|
"successfully_revised": len(revised_cards), |
|
"final_approval_rate": len(final_cards) / len(cards) if cards else 0, |
|
"judge_decisions": len(judge_results), |
|
} |
|
|
|
logger.info( |
|
f"Quality phase complete: {len(final_cards)}/{len(cards)} cards approved" |
|
) |
|
return final_cards, quality_results |
|
|
|
async def _enhancement_phase(self, cards: List[Card]) -> List[Card]: |
|
"""Execute the enhancement phase""" |
|
|
|
if not self.enhancement_agent: |
|
return cards |
|
|
|
logger.info(f"Starting enhancement for {len(cards)} cards") |
|
|
|
enhanced_cards = await self.enhancement_agent.enhance_card_batch( |
|
cards=cards, enhancement_targets=["explanation", "example", "metadata"] |
|
) |
|
|
|
logger.info(f"Enhancement phase complete: {len(enhanced_cards)} cards enhanced") |
|
return enhanced_cards |
|
|
|
def get_performance_metrics(self) -> Dict[str, Any]: |
|
"""Get performance metrics for the agent system""" |
|
|
|
|
|
return { |
|
"agents_enabled": True, |
|
} |
|
|
|
|
|
async def integrate_with_existing_workflow( |
|
client_manager: OpenAIClientManager, api_key: str, **generation_params |
|
) -> Tuple[List[Card], Dict[str, Any]]: |
|
"""Integration point for existing AnkiGen workflow""" |
|
|
|
|
|
|
|
|
|
orchestrator = AgentOrchestrator(client_manager) |
|
await orchestrator.initialize(api_key) |
|
|
|
cards, metadata = await orchestrator.generate_cards_with_agents(**generation_params) |
|
|
|
return cards, metadata |
|
|