Spaces:
Sleeping
Sleeping
File size: 2,914 Bytes
22f1389 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
from datetime import datetime
from agents.risk_agent import RiskAnalysisAgent
from agents.recommendation_agents import (
EmergencyRecommendationAgent,
HouseholdAdaptationAgent,
BusinessContinuityAgent,
FinancialAdaptationAgent,
)
class ClimateRiskOrchestrator:
"""Main orchestrator that coordinates all agents."""
def __init__(self, model):
self.risk_agent = RiskAnalysisAgent(model)
self.emergency_agent = EmergencyRecommendationAgent(model)
self.household_agent = HouseholdAdaptationAgent(model)
self.business_agent = BusinessContinuityAgent(model)
self.financial_agent = FinancialAdaptationAgent(model)
def analyze_and_recommend(self, user_query: str, user_profile: dict) -> dict:
"""Main method that coordinates risk analysis and recommendation generation."""
try:
print("π Analyzing risks...")
risk_analysis = self.risk_agent.analyze_risks(user_query)
recommendations = {}
print("π¨ Generating emergency recommendations...")
recommendations["emergency"] = (
self.emergency_agent.generate_emergency_recommendations(
risk_analysis, user_profile
)
)
print("π° Generating financial recommendations...")
recommendations["financial"] = (
self.financial_agent.generate_financial_recommendations(
risk_analysis, user_profile
)
)
profile_type = user_profile.get("type", "general").lower()
if profile_type in [
"household",
"residential",
"general",
"general public",
]:
print("π Generating household recommendations...")
recommendations["household"] = (
self.household_agent.generate_household_recommendations(
risk_analysis, user_profile
)
)
if profile_type in ["business", "business owner"]:
print("π’ Generating business recommendations...")
recommendations["business"] = (
self.business_agent.generate_business_recommendations(
risk_analysis, user_profile
)
)
return {
"risk_analysis": risk_analysis,
"recommendations": recommendations,
"generated_at": datetime.now().isoformat(),
"user_profile": user_profile,
}
except Exception as e:
return {
"error": f"Analysis failed: {str(e)}",
"risk_analysis": {},
"recommendations": {},
"generated_at": datetime.now().isoformat(),
}
|