ankigen / ankigen_core /agents /integration.py
brickfrog's picture
Upload folder using huggingface_hub
313f83b verified
raw
history blame
9.23 kB
# Main integration module for AnkiGen agent system
from typing import List, Dict, Any, Tuple
from datetime import datetime
from ankigen_core.logging import logger
from ankigen_core.models import Card
from ankigen_core.llm_interface import OpenAIClientManager
from .generators import GenerationCoordinator, SubjectExpertAgent
from .judges import JudgeCoordinator
from .enhancers import RevisionAgent, EnhancementAgent
class AgentOrchestrator:
"""Main orchestrator for the AnkiGen agent system"""
def __init__(self, client_manager: OpenAIClientManager):
self.client_manager = client_manager
self.openai_client = None
# Initialize coordinators
self.generation_coordinator = None
self.judge_coordinator = None
self.revision_agent = None
self.enhancement_agent = None
# All agents enabled by default
self.all_agents_enabled = True
async def initialize(self, api_key: str, model_overrides: Dict[str, str] = None):
"""Initialize the agent system"""
try:
# Initialize OpenAI client
await self.client_manager.initialize_client(api_key)
self.openai_client = self.client_manager.get_client()
# Set up model overrides if provided
if model_overrides:
from ankigen_core.agents.config import get_config_manager
config_manager = get_config_manager()
config_manager.update_models(model_overrides)
logger.info(f"Applied model overrides: {model_overrides}")
# Initialize all agents
self.generation_coordinator = GenerationCoordinator(self.openai_client)
self.judge_coordinator = JudgeCoordinator(self.openai_client)
self.revision_agent = RevisionAgent(self.openai_client)
self.enhancement_agent = EnhancementAgent(self.openai_client)
logger.info("Agent system initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize agent system: {e}")
raise
async def generate_cards_with_agents(
self,
topic: str,
subject: str = "general",
num_cards: int = 5,
difficulty: str = "intermediate",
enable_quality_pipeline: bool = True,
context: Dict[str, Any] = None,
) -> Tuple[List[Card], Dict[str, Any]]:
"""Generate cards using the agent system"""
start_time = datetime.now()
try:
# Agents are always enabled now
if not self.openai_client:
raise ValueError("Agent system not initialized")
logger.info(f"Starting agent-based card generation: {topic} ({subject})")
# Phase 1: Generation
cards = await self._generation_phase(
topic=topic,
subject=subject,
num_cards=num_cards,
difficulty=difficulty,
context=context,
)
# Phase 2: Quality Assessment
quality_results = {}
if enable_quality_pipeline and self.judge_coordinator:
cards, quality_results = await self._quality_phase(cards)
# Phase 3: Enhancement
if self.enhancement_agent:
cards = await self._enhancement_phase(cards)
# Collect metadata
metadata = {
"generation_method": "agent_system",
"generation_time": (datetime.now() - start_time).total_seconds(),
"cards_generated": len(cards),
"quality_results": quality_results,
"topic": topic,
"subject": subject,
"difficulty": difficulty,
}
logger.info(
f"Agent-based generation complete: {len(cards)} cards generated"
)
return cards, metadata
except Exception as e:
logger.error(f"Agent-based generation failed: {e}")
raise
async def _generation_phase(
self,
topic: str,
subject: str,
num_cards: int,
difficulty: str,
context: Dict[str, Any] = None,
) -> List[Card]:
"""Execute the card generation phase"""
if self.generation_coordinator:
# Use coordinated multi-agent generation
cards = await self.generation_coordinator.coordinate_generation(
topic=topic,
subject=subject,
num_cards=num_cards,
difficulty=difficulty,
enable_review=True,
enable_structuring=True,
context=context,
)
else:
# Use subject expert agent directly
subject_expert = SubjectExpertAgent(self.openai_client, subject)
cards = await subject_expert.generate_cards(
topic=topic, num_cards=num_cards, difficulty=difficulty, context=context
)
logger.info(f"Generation phase complete: {len(cards)} cards generated")
return cards
async def _quality_phase(
self, cards: List[Card]
) -> Tuple[List[Card], Dict[str, Any]]:
"""Execute the quality assessment and improvement phase"""
if not self.judge_coordinator:
return cards, {"message": "Judge coordinator not available"}
logger.info(f"Starting quality assessment for {len(cards)} cards")
# Judge all cards
judge_results = await self.judge_coordinator.coordinate_judgment(
cards=cards,
enable_parallel=True,
min_consensus=0.6,
)
# Separate approved and rejected cards
approved_cards = []
rejected_cards = []
for card, decisions, approved in judge_results:
if approved:
approved_cards.append(card)
else:
rejected_cards.append((card, decisions))
# Attempt to revise rejected cards
revised_cards = []
if self.revision_agent and rejected_cards:
logger.info(f"Attempting to revise {len(rejected_cards)} rejected cards")
for card, decisions in rejected_cards:
try:
revised_card = await self.revision_agent.revise_card(
card=card,
judge_decisions=decisions,
max_iterations=2,
)
# Re-judge the revised card
revision_results = await self.judge_coordinator.coordinate_judgment(
cards=[revised_card],
enable_parallel=False, # Single card, no need for parallel
min_consensus=0.6,
)
if revision_results and revision_results[0][2]: # If approved
revised_cards.append(revised_card)
else:
logger.warning(
f"Revised card still rejected: {card.front.question[:50]}..."
)
except Exception as e:
logger.error(f"Failed to revise card: {e}")
# Combine approved and successfully revised cards
final_cards = approved_cards + revised_cards
# Prepare quality results
quality_results = {
"total_cards_judged": len(cards),
"initially_approved": len(approved_cards),
"initially_rejected": len(rejected_cards),
"successfully_revised": len(revised_cards),
"final_approval_rate": len(final_cards) / len(cards) if cards else 0,
"judge_decisions": len(judge_results),
}
logger.info(
f"Quality phase complete: {len(final_cards)}/{len(cards)} cards approved"
)
return final_cards, quality_results
async def _enhancement_phase(self, cards: List[Card]) -> List[Card]:
"""Execute the enhancement phase"""
if not self.enhancement_agent:
return cards
logger.info(f"Starting enhancement for {len(cards)} cards")
enhanced_cards = await self.enhancement_agent.enhance_card_batch(
cards=cards, enhancement_targets=["explanation", "example", "metadata"]
)
logger.info(f"Enhancement phase complete: {len(enhanced_cards)} cards enhanced")
return enhanced_cards
def get_performance_metrics(self) -> Dict[str, Any]:
"""Get performance metrics for the agent system"""
# Basic performance info only
return {
"agents_enabled": True,
}
async def integrate_with_existing_workflow(
client_manager: OpenAIClientManager, api_key: str, **generation_params
) -> Tuple[List[Card], Dict[str, Any]]:
"""Integration point for existing AnkiGen workflow"""
# Agents are always enabled
# Initialize and use agent system
orchestrator = AgentOrchestrator(client_manager)
await orchestrator.initialize(api_key)
cards, metadata = await orchestrator.generate_cards_with_agents(**generation_params)
return cards, metadata