Spaces:
Sleeping
Sleeping
from datetime import datetime | |
from agents.risk_agent import RiskAnalysisAgent | |
from agents.recommendation_agents import ( | |
EmergencyRecommendationAgent, | |
HouseholdAdaptationAgent, | |
BusinessContinuityAgent, | |
FinancialAdaptationAgent, | |
) | |
class ClimateRiskOrchestrator: | |
"""Main orchestrator that coordinates all agents.""" | |
def __init__(self, model): | |
self.risk_agent = RiskAnalysisAgent(model) | |
self.emergency_agent = EmergencyRecommendationAgent(model) | |
self.household_agent = HouseholdAdaptationAgent(model) | |
self.business_agent = BusinessContinuityAgent(model) | |
self.financial_agent = FinancialAdaptationAgent(model) | |
def analyze_and_recommend(self, user_query: str, user_profile: dict) -> dict: | |
"""Main method that coordinates risk analysis and recommendation generation.""" | |
try: | |
print("π Analyzing risks...") | |
risk_analysis = self.risk_agent.analyze_risks(user_query) | |
recommendations = {} | |
print("π¨ Generating emergency recommendations...") | |
recommendations["emergency"] = ( | |
self.emergency_agent.generate_emergency_recommendations( | |
risk_analysis, user_profile | |
) | |
) | |
print("π° Generating financial recommendations...") | |
recommendations["financial"] = ( | |
self.financial_agent.generate_financial_recommendations( | |
risk_analysis, user_profile | |
) | |
) | |
profile_type = user_profile.get("type", "general").lower() | |
if profile_type in [ | |
"household", | |
"residential", | |
"general", | |
"general public", | |
]: | |
print("π Generating household recommendations...") | |
recommendations["household"] = ( | |
self.household_agent.generate_household_recommendations( | |
risk_analysis, user_profile | |
) | |
) | |
if profile_type in ["business", "business owner"]: | |
print("π’ Generating business recommendations...") | |
recommendations["business"] = ( | |
self.business_agent.generate_business_recommendations( | |
risk_analysis, user_profile | |
) | |
) | |
return { | |
"risk_analysis": risk_analysis, | |
"recommendations": recommendations, | |
"generated_at": datetime.now().isoformat(), | |
"user_profile": user_profile, | |
} | |
except Exception as e: | |
return { | |
"error": f"Analysis failed: {str(e)}", | |
"risk_analysis": {}, | |
"recommendations": {}, | |
"generated_at": datetime.now().isoformat(), | |
} | |