Spaces:
Sleeping
Sleeping
import json | |
import ast | |
from typing import List | |
import litellm | |
from smolagents import CodeAgent | |
from tools.tools import find_local_emergency_resources | |
class EmergencyRecommendationAgent: | |
"""Agent focused on emergency preparedness recommendations.""" | |
def __init__(self, model): | |
self.agent = CodeAgent( | |
tools=[find_local_emergency_resources], | |
model=model, | |
additional_authorized_imports=["json", "datetime"], | |
) | |
def generate_emergency_recommendations( | |
self, risk_analysis: dict, user_profile: dict | |
) -> List[str]: | |
"""Generate emergency preparedness recommendations.""" | |
prompt = f""" | |
You are an emergency preparedness expert. Based on this risk analysis and user profile, generate 3-7 specific, actionable emergency recommendations. | |
Risk Analysis: {str(risk_analysis)} | |
User Profile: {str(user_profile)} | |
Your recommendations should be: | |
- Specific and actionable | |
- Tailored to the identified risks | |
- Appropriate for the user's situation | |
- Prioritized by urgency/importance | |
Focus on immediate actions they can take to prepare for or mitigate the identified risks. | |
IMPORTANT: Return a simple Python list of strings, like this: | |
["Create an emergency kit with 72 hours of supplies", "Identify evacuation routes", "Install smoke detectors"] | |
Do not return JSON or any other format - just a Python list. | |
""" | |
try: | |
try: | |
response = self.agent.run(prompt) | |
except Exception as e: | |
print(e) | |
#response = self.agent.run(prompt) | |
#response = litellm.completion(messages=prompt, model="anthropic/claude-sonnet-4-20250514") | |
if isinstance(response, list): | |
return response | |
elif isinstance(response, str): | |
try: | |
return ast.literal_eval(response) | |
except (ValueError, SyntaxError): | |
try: | |
return json.loads(response) | |
except json.JSONDecodeError: | |
return self._extract_recommendations_from_text(response) | |
else: | |
return [ | |
"Prepare emergency supplies", | |
"Review evacuation plans", | |
"Monitor weather alerts", | |
] | |
except Exception as e: | |
print(f"Emergency recommendations error: {e}") | |
return [ | |
"Create an emergency kit with 72 hours of supplies", | |
"Identify and practice evacuation routes", | |
"Keep important documents in waterproof container", | |
"Monitor local emergency alerts and warnings", | |
] | |
def _extract_recommendations_from_text(self, text: str) -> List[str]: | |
"""Extract recommendations from text response.""" | |
lines = text.split("\n") | |
recommendations = [] | |
for line in lines: | |
line = line.strip() | |
if line and ( | |
line.startswith("-") or line.startswith("•") or line.startswith("*") | |
): | |
recommendations.append(line[1:].strip()) | |
elif line and line[0].isdigit() and "." in line: | |
recommendations.append(line.split(".", 1)[1].strip()) | |
return ( | |
recommendations[:7] | |
if recommendations | |
else ["Prepare emergency supplies", "Review evacuation plans"] | |
) | |
class HouseholdAdaptationAgent: | |
"""Agent for household-level climate adaptation recommendations.""" | |
def __init__(self, model): | |
self.agent = CodeAgent( | |
tools=[], model=model, additional_authorized_imports=["json"] | |
) | |
def generate_household_recommendations( | |
self, risk_analysis: dict, user_profile: dict | |
) -> List[str]: | |
"""Generate household adaptation recommendations.""" | |
prompt = f""" | |
You are a household climate adaptation specialist. Based on the risk analysis and user profile, generate 3-8 specific recommendations for household-level climate adaptations. | |
Risk Analysis: {str(risk_analysis)} | |
User Profile: {str(user_profile)} | |
Your recommendations should address: | |
- Home modifications for identified risks | |
- Energy efficiency improvements | |
- Comfort and health considerations | |
- Cost-effective solutions | |
- Long-term resilience building | |
Focus on practical, implementable actions that enhance the household's resilience to the identified climate risks. | |
IMPORTANT: Return a simple Python list of strings. | |
Do not return JSON - just a Python list. | |
""" | |
try: | |
try: | |
response = self.agent.run(prompt) | |
except Exception as e: | |
print(e) | |
#response = self.agent.run(prompt) | |
#response = litellm.completion(messages=prompt, model="anthropic/claude-sonnet-4-20250514") | |
if isinstance(response, list): | |
return response | |
elif isinstance(response, str): | |
try: | |
return ast.literal_eval(response) | |
except (ValueError, SyntaxError): | |
try: | |
return json.loads(response) | |
except json.JSONDecodeError: | |
return self._extract_recommendations_from_text(response) | |
else: | |
return [ | |
"Improve home insulation", | |
"Install efficient heating/cooling", | |
"Weather-proof windows and doors", | |
] | |
except Exception as e: | |
print(f"Household recommendations error: {e}") | |
return [ | |
"Improve home insulation to reduce energy costs", | |
"Install programmable thermostat", | |
"Weather-strip doors and windows", | |
"Consider backup power options", | |
] | |
def _extract_recommendations_from_text(self, text: str) -> List[str]: | |
"""Extract recommendations from text response.""" | |
lines = text.split("\n") | |
recommendations = [] | |
for line in lines: | |
line = line.strip() | |
if line and ( | |
line.startswith("-") or line.startswith("•") or line.startswith("*") | |
): | |
recommendations.append(line[1:].strip()) | |
elif line and line[0].isdigit() and "." in line: | |
recommendations.append(line.split(".", 1)[1].strip()) | |
return ( | |
recommendations[:8] | |
if recommendations | |
else ["Improve home insulation", "Install efficient heating/cooling"] | |
) | |
class BusinessContinuityAgent: | |
"""Agent for business continuity and adaptation recommendations.""" | |
def __init__(self, model): | |
self.agent = CodeAgent( | |
tools=[], model=model, additional_authorized_imports=["json"] | |
) | |
def generate_business_recommendations( | |
self, risk_analysis: dict, user_profile: dict | |
) -> List[str]: | |
"""Generate business continuity recommendations.""" | |
prompt = f""" | |
You are a business continuity and climate adaptation consultant. Generate 4-10 specific recommendations for business resilience based on the risk analysis and user profile. | |
Risk Analysis: {str(risk_analysis)} | |
User Profile: {str(user_profile)} | |
Consider: | |
- Operational continuity during climate events | |
- Supply chain resilience | |
- Infrastructure protection | |
- Employee safety | |
- Financial risk management | |
- Market opportunities in climate adaptation | |
Provide actionable, business-focused recommendations that address the specific risks identified. | |
IMPORTANT: Return a simple Python list of strings. | |
Do not return JSON - just a Python list. | |
""" | |
try: | |
try: | |
response = self.agent.run(prompt) | |
except Exception as e: | |
print(e) | |
#response = self.agent.run(prompt) | |
#response = litellm.completion(messages=prompt, model="anthropic/claude-sonnet-4-20250514") | |
if isinstance(response, list): | |
return response | |
elif isinstance(response, str): | |
try: | |
return ast.literal_eval(response) | |
except (ValueError, SyntaxError): | |
try: | |
return json.loads(response) | |
except json.JSONDecodeError: | |
return self._extract_recommendations_from_text(response) | |
else: | |
return [ | |
"Develop business continuity plan", | |
"Review insurance coverage", | |
"Diversify supply chains", | |
] | |
except Exception as e: | |
print(f"Business recommendations error: {e}") | |
return [ | |
"Develop comprehensive business continuity plan", | |
"Review and update insurance coverage", | |
"Diversify supply chain sources", | |
"Create employee safety protocols", | |
] | |
def _extract_recommendations_from_text(self, text: str) -> List[str]: | |
"""Extract recommendations from text response.""" | |
lines = text.split("\n") | |
recommendations = [] | |
for line in lines: | |
line = line.strip() | |
if line and ( | |
line.startswith("-") or line.startswith("•") or line.startswith("*") | |
): | |
recommendations.append(line[1:].strip()) | |
elif line and line[0].isdigit() and "." in line: | |
recommendations.append(line.split(".", 1)[1].strip()) | |
return ( | |
recommendations[:10] | |
if recommendations | |
else ["Develop business continuity plan", "Review insurance coverage"] | |
) | |
class FinancialAdaptationAgent: | |
"""Agent focused on financial planning and climate risk economics.""" | |
def __init__(self, model): | |
self.agent = CodeAgent( | |
tools=[], model=model, additional_authorized_imports=["json"] | |
) | |
def generate_financial_recommendations( | |
self, risk_analysis: dict, user_profile: dict | |
) -> List[str]: | |
"""Generate financial planning recommendations for climate risks.""" | |
prompt = f""" | |
You are a financial advisor specializing in climate risk management. Generate 4-7 specific financial recommendations based on the risk analysis. | |
Risk Analysis: {str(risk_analysis)} | |
User Profile: {str(user_profile)} | |
Address: | |
- Insurance coverage optimization | |
- Emergency fund planning | |
- Climate-resilient investments | |
- Government incentives and rebates | |
- Tax implications of adaptations | |
- Long-term financial planning for climate change | |
- Risk transfer mechanisms | |
Provide actionable financial strategies that help manage the economic impacts of identified climate risks. | |
IMPORTANT: Return a simple Python list of strings. | |
Do not return JSON - just a Python list. | |
""" | |
try: | |
try: | |
response = self.agent.run(prompt) | |
except Exception as e: | |
print(e) | |
#response = self.agent.run(prompt) | |
#response = litellm.completion(messages=prompt, model="anthropic/claude-sonnet-4-20250514") | |
if isinstance(response, list): | |
return response | |
elif isinstance(response, str): | |
try: | |
return ast.literal_eval(response) | |
except (ValueError, SyntaxError): | |
try: | |
return json.loads(response) | |
except json.JSONDecodeError: | |
return self._extract_recommendations_from_text(response) | |
else: | |
return [ | |
"Review insurance coverage", | |
"Build emergency fund", | |
"Explore tax incentives", | |
] | |
except Exception as e: | |
print(f"Financial recommendations error: {e}") | |
return [ | |
"Review and update insurance coverage for climate risks", | |
"Build emergency fund covering 3-6 months expenses", | |
"Explore government incentives for climate adaptations", | |
"Consider climate-resilient investment options", | |
] | |
def _extract_recommendations_from_text(self, text: str) -> List[str]: | |
"""Extract recommendations from text response.""" | |
lines = text.split("\n") | |
recommendations = [] | |
for line in lines: | |
line = line.strip() | |
if line and ( | |
line.startswith("-") or line.startswith("•") or line.startswith("*") | |
): | |
recommendations.append(line[1:].strip()) | |
elif line and line[0].isdigit() and "." in line: | |
recommendations.append(line.split(".", 1)[1].strip()) | |
return ( | |
recommendations[:7] | |
if recommendations | |
else ["Review insurance coverage", "Build emergency fund"] | |
) | |