cava-ai / agents /orchestrator.py
frankthtank's picture
move to dir
499489f verified
from datetime import datetime
from agents.risk_agent import RiskAnalysisAgent
from agents.recommendation_agents import (
EmergencyRecommendationAgent,
HouseholdAdaptationAgent,
BusinessContinuityAgent,
FinancialAdaptationAgent,
)
class ClimateRiskOrchestrator:
"""Main orchestrator that coordinates all agents."""
def __init__(self, model):
self.risk_agent = RiskAnalysisAgent(model)
self.emergency_agent = EmergencyRecommendationAgent(model)
self.household_agent = HouseholdAdaptationAgent(model)
self.business_agent = BusinessContinuityAgent(model)
self.financial_agent = FinancialAdaptationAgent(model)
def analyze_and_recommend(self, user_query: str, user_profile: dict) -> dict:
"""Main method that coordinates risk analysis and recommendation generation."""
try:
print("πŸ” Analyzing risks...")
risk_analysis = self.risk_agent.analyze_risks(user_query)
recommendations = {}
print("🚨 Generating emergency recommendations...")
recommendations["emergency"] = (
self.emergency_agent.generate_emergency_recommendations(
risk_analysis, user_profile
)
)
print("πŸ’° Generating financial recommendations...")
recommendations["financial"] = (
self.financial_agent.generate_financial_recommendations(
risk_analysis, user_profile
)
)
profile_type = user_profile.get("type", "general").lower()
if profile_type in [
"household",
"residential",
"general",
"general public",
]:
print("🏠 Generating household recommendations...")
recommendations["household"] = (
self.household_agent.generate_household_recommendations(
risk_analysis, user_profile
)
)
if profile_type in ["business", "business owner"]:
print("🏒 Generating business recommendations...")
recommendations["business"] = (
self.business_agent.generate_business_recommendations(
risk_analysis, user_profile
)
)
return {
"risk_analysis": risk_analysis,
"recommendations": recommendations,
"generated_at": datetime.now().isoformat(),
"user_profile": user_profile,
}
except Exception as e:
return {
"error": f"Analysis failed: {str(e)}",
"risk_analysis": {},
"recommendations": {},
"generated_at": datetime.now().isoformat(),
}