mona / utils /integrations /weather.py
mrradix's picture
Upload 48 files
8e4018d verified
import requests
import json
from typing import Dict, List, Any, Optional, Union, Tuple
from datetime import datetime, timedelta
import time
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions, IntegrationError
from utils.storage import load_data, save_data
# Initialize logger
logger = setup_logger(__name__)
class WeatherIntegration:
"""Weather API integration for environmental data"""
def __init__(self, api_key: Optional[str] = None, provider: str = "openweathermap"):
"""Initialize Weather API integration
Args:
api_key: API key for the weather provider (optional)
provider: Weather data provider (default: openweathermap)
"""
# If no API key is provided, try to get it from settings
if api_key is None:
from utils.integrations import get_api_key
api_key = get_api_key("OpenWeatherMap")
self.api_key = api_key
self.provider = provider.lower()
self.cache = {}
self.cache_expiry = {}
# Set up provider-specific configurations
self.providers = {
"openweathermap": {
"current_url": "https://api.openweathermap.org/data/2.5/weather",
"forecast_url": "https://api.openweathermap.org/data/2.5/forecast",
"onecall_url": "https://api.openweathermap.org/data/3.0/onecall",
"geocoding_url": "https://api.openweathermap.org/geo/1.0/direct",
"cache_duration": 1800 # 30 minutes
},
"weatherapi": {
"base_url": "https://api.weatherapi.com/v1",
"current_url": "https://api.weatherapi.com/v1/current.json",
"forecast_url": "https://api.weatherapi.com/v1/forecast.json",
"search_url": "https://api.weatherapi.com/v1/search.json",
"cache_duration": 1800 # 30 minutes
}
}
@handle_exceptions
def set_api_key(self, api_key: str) -> None:
"""Set API key for the weather provider
Args:
api_key: API key
"""
self.api_key = api_key
# Clear cache when API key changes
self.cache = {}
self.cache_expiry = {}
@handle_exceptions
def set_provider(self, provider: str) -> None:
"""Set weather data provider
Args:
provider: Weather data provider
"""
provider = provider.lower()
if provider not in self.providers:
raise IntegrationError(f"Unsupported weather provider: {provider}")
self.provider = provider
# Clear cache when provider changes
self.cache = {}
self.cache_expiry = {}
@handle_exceptions
def test_connection(self) -> bool:
"""Test weather API connection
Returns:
True if connection is successful, False otherwise
"""
if not self.api_key:
logger.error("Weather API key not set")
return False
try:
import requests # Import here to avoid circular imports
if self.provider == "openweathermap":
# Test with a simple geocoding request
params = {
"q": "London",
"limit": 1,
"appid": self.api_key
}
response = requests.get(self.providers[self.provider]["geocoding_url"], params=params)
# Check if the request was successful
if response.status_code == 200:
return True
else:
logger.error(f"API test failed with status code {response.status_code}: {response.text}")
return False
elif self.provider == "weatherapi":
# Test with a simple search request
params = {
"key": self.api_key,
"q": "London"
}
response = requests.get(self.providers[self.provider]["search_url"], params=params)
return response.status_code == 200
except Exception as e:
logger.error(f"Weather API connection test failed: {str(e)}")
return False
@handle_exceptions
def get_current_weather(self, location: str, units: str = "metric") -> Dict[str, Any]:
"""Get current weather for a location
Args:
location: Location name or coordinates (lat,lon)
units: Units of measurement (metric, imperial, standard)
Returns:
Current weather data
"""
if not self.api_key:
raise IntegrationError("Weather API key not set")
# Check cache
cache_key = f"current_{location}_{units}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "openweathermap":
# Check if location is coordinates
if "," in location and all(part.replace(".", "").replace("-", "").isdigit() for part in location.split(",")):
lat, lon = location.split(",")
params = {
"lat": lat.strip(),
"lon": lon.strip(),
"appid": self.api_key,
"units": units
}
else:
params = {
"q": location,
"appid": self.api_key,
"units": units
}
response = requests.get(self.providers[self.provider]["current_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get weather data: {response.text}")
data = response.json()
# Process data into a standardized format
weather_data = self._process_openweathermap_current(data)
elif self.provider == "weatherapi":
params = {
"key": self.api_key,
"q": location,
"aqi": "yes" # Include air quality data
}
response = requests.get(self.providers[self.provider]["current_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get weather data: {response.text}")
data = response.json()
# Process data into a standardized format
weather_data = self._process_weatherapi_current(data, units)
else:
raise IntegrationError(f"Unsupported weather provider: {self.provider}")
# Cache the result
self.cache[cache_key] = weather_data
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return weather_data
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get weather data: {str(e)}")
raise IntegrationError(f"Failed to get weather data: {str(e)}")
raise
@handle_exceptions
def get_weather_forecast(self, location: str, days: int = 5, units: str = "metric") -> Dict[str, Any]:
"""Get weather forecast for a location
Args:
location: Location name or coordinates (lat,lon)
days: Number of days to forecast (default: 5)
units: Units of measurement (metric, imperial, standard)
Returns:
Weather forecast data
"""
if not self.api_key:
raise IntegrationError("Weather API key not set")
# Check cache
cache_key = f"forecast_{location}_{days}_{units}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "openweathermap":
# For OpenWeatherMap, we need to get coordinates first if location is a name
coords = self._get_coordinates(location)
# Use OneCall API for better forecast data
params = {
"lat": coords[0],
"lon": coords[1],
"appid": self.api_key,
"units": units,
"exclude": "minutely,alerts"
}
response = requests.get(self.providers[self.provider]["onecall_url"], params=params)
if response.status_code != 200:
# Fall back to standard forecast API
if "," in location and all(part.replace(".", "").replace("-", "").isdigit() for part in location.split(",")):
lat, lon = location.split(",")
params = {
"lat": lat.strip(),
"lon": lon.strip(),
"appid": self.api_key,
"units": units,
"cnt": min(days * 8, 40) # 3-hour steps, max 5 days (40 steps)
}
else:
params = {
"q": location,
"appid": self.api_key,
"units": units,
"cnt": min(days * 8, 40) # 3-hour steps, max 5 days (40 steps)
}
response = requests.get(self.providers[self.provider]["forecast_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get forecast data: {response.text}")
data = response.json()
forecast_data = self._process_openweathermap_forecast(data, days)
else:
data = response.json()
forecast_data = self._process_openweathermap_onecall(data, days)
elif self.provider == "weatherapi":
params = {
"key": self.api_key,
"q": location,
"days": min(days, 14), # WeatherAPI supports up to 14 days
"aqi": "yes", # Include air quality data
"alerts": "yes" # Include weather alerts
}
response = requests.get(self.providers[self.provider]["forecast_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get forecast data: {response.text}")
data = response.json()
forecast_data = self._process_weatherapi_forecast(data, days, units)
else:
raise IntegrationError(f"Unsupported weather provider: {self.provider}")
# Cache the result
self.cache[cache_key] = forecast_data
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return forecast_data
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get forecast data: {str(e)}")
raise IntegrationError(f"Failed to get forecast data: {str(e)}")
raise
@handle_exceptions
def search_locations(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Search for locations
Args:
query: Search query
limit: Maximum number of results (default: 5)
Returns:
List of location data
"""
if not self.api_key:
raise IntegrationError("Weather API key not set")
try:
if self.provider == "openweathermap":
params = {
"q": query,
"limit": limit,
"appid": self.api_key
}
response = requests.get(self.providers[self.provider]["geocoding_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to search locations: {response.text}")
data = response.json()
# Process data into a standardized format
locations = []
for item in data:
locations.append({
"name": item.get("name", ""),
"country": item.get("country", ""),
"state": item.get("state", ""),
"lat": item.get("lat"),
"lon": item.get("lon"),
"id": f"{item.get('lat')},{item.get('lon')}"
})
elif self.provider == "weatherapi":
params = {
"key": self.api_key,
"q": query
}
response = requests.get(self.providers[self.provider]["search_url"], params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to search locations: {response.text}")
data = response.json()
# Process data into a standardized format
locations = []
for item in data[:limit]:
locations.append({
"name": item.get("name", ""),
"country": item.get("country", ""),
"region": item.get("region", ""),
"lat": item.get("lat"),
"lon": item.get("lon"),
"id": f"{item.get('lat')},{item.get('lon')}"
})
else:
raise IntegrationError(f"Unsupported weather provider: {self.provider}")
return locations
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to search locations: {str(e)}")
raise IntegrationError(f"Failed to search locations: {str(e)}")
raise
@handle_exceptions
def get_air_quality(self, location: str) -> Dict[str, Any]:
"""Get air quality data for a location
Args:
location: Location name or coordinates (lat,lon)
Returns:
Air quality data
"""
if not self.api_key:
raise IntegrationError("Weather API key not set")
# Check cache
cache_key = f"air_{location}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "openweathermap":
# Get coordinates if location is a name
coords = self._get_coordinates(location)
# Use Air Pollution API
params = {
"lat": coords[0],
"lon": coords[1],
"appid": self.api_key
}
response = requests.get("https://api.openweathermap.org/data/2.5/air_pollution", params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get air quality data: {response.text}")
data = response.json()
# Process data into a standardized format
if "list" in data and data["list"]:
air_data = data["list"][0]
components = air_data.get("components", {})
aqi = air_data.get("main", {}).get("aqi", 0)
# Map AQI value to category
aqi_categories = ["Good", "Fair", "Moderate", "Poor", "Very Poor"]
aqi_category = aqi_categories[aqi - 1] if 1 <= aqi <= 5 else "Unknown"
air_quality = {
"aqi": aqi,
"category": aqi_category,
"components": {
"co": components.get("co"), # Carbon monoxide (μg/m3)
"no": components.get("no"), # Nitrogen monoxide (μg/m3)
"no2": components.get("no2"), # Nitrogen dioxide (μg/m3)
"o3": components.get("o3"), # Ozone (μg/m3)
"so2": components.get("so2"), # Sulphur dioxide (μg/m3)
"pm2_5": components.get("pm2_5"), # Fine particles (μg/m3)
"pm10": components.get("pm10"), # Coarse particles (μg/m3)
"nh3": components.get("nh3") # Ammonia (μg/m3)
},
"timestamp": datetime.fromtimestamp(air_data.get("dt", 0)).isoformat()
}
else:
raise IntegrationError("No air quality data available")
elif self.provider == "weatherapi":
# WeatherAPI includes air quality in current weather
weather_data = self.get_current_weather(location)
if "air_quality" in weather_data:
air_quality = weather_data["air_quality"]
else:
raise IntegrationError("No air quality data available")
else:
raise IntegrationError(f"Unsupported weather provider: {self.provider}")
# Cache the result
self.cache[cache_key] = air_quality
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return air_quality
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get air quality data: {str(e)}")
raise IntegrationError(f"Failed to get air quality data: {str(e)}")
raise
def _get_coordinates(self, location: str) -> Tuple[float, float]:
"""Get coordinates for a location name
Args:
location: Location name or coordinates
Returns:
Tuple of (latitude, longitude)
"""
# Check if location is already coordinates
if "," in location and all(part.replace(".", "").replace("-", "").isdigit() for part in location.split(",")):
lat, lon = location.split(",")
return float(lat.strip()), float(lon.strip())
# Search for location
locations = self.search_locations(location, 1)
if not locations:
raise IntegrationError(f"Location not found: {location}")
return locations[0]["lat"], locations[0]["lon"]
def _process_openweathermap_current(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process OpenWeatherMap current weather data
Args:
data: Raw API response data
Returns:
Processed weather data
"""
weather = data.get("weather", [{}])[0]
main = data.get("main", {})
wind = data.get("wind", {})
clouds = data.get("clouds", {})
rain = data.get("rain", {})
snow = data.get("snow", {})
sys = data.get("sys", {})
return {
"location": {
"name": data.get("name", ""),
"country": sys.get("country", ""),
"lat": data.get("coord", {}).get("lat"),
"lon": data.get("coord", {}).get("lon"),
"timezone": data.get("timezone", 0),
"sunrise": datetime.fromtimestamp(sys.get("sunrise", 0)).isoformat() if sys.get("sunrise") else None,
"sunset": datetime.fromtimestamp(sys.get("sunset", 0)).isoformat() if sys.get("sunset") else None
},
"current": {
"timestamp": datetime.fromtimestamp(data.get("dt", 0)).isoformat(),
"temp": main.get("temp"),
"feels_like": main.get("feels_like"),
"pressure": main.get("pressure"),
"humidity": main.get("humidity"),
"dew_point": None, # Not provided in this API
"uvi": None, # Not provided in this API
"clouds": clouds.get("all"),
"visibility": data.get("visibility"),
"wind_speed": wind.get("speed"),
"wind_deg": wind.get("deg"),
"wind_gust": wind.get("gust"),
"weather": {
"id": weather.get("id"),
"main": weather.get("main"),
"description": weather.get("description"),
"icon": weather.get("icon")
},
"rain_1h": rain.get("1h"),
"snow_1h": snow.get("1h")
}
}
def _process_openweathermap_forecast(self, data: Dict[str, Any], days: int) -> Dict[str, Any]:
"""Process OpenWeatherMap forecast data
Args:
data: Raw API response data
days: Number of days to forecast
Returns:
Processed forecast data
"""
city = data.get("city", {})
forecast_items = data.get("list", [])
# Group forecast items by day
daily_forecasts = {}
for item in forecast_items:
dt = datetime.fromtimestamp(item.get("dt", 0))
day_key = dt.strftime("%Y-%m-%d")
if day_key not in daily_forecasts:
daily_forecasts[day_key] = []
daily_forecasts[day_key].append(item)
# Process daily forecasts
daily = []
for day_key in sorted(daily_forecasts.keys())[:days]:
items = daily_forecasts[day_key]
# Calculate daily min/max temperatures
temps = [item.get("main", {}).get("temp") for item in items if "main" in item]
temp_min = min(temps) if temps else None
temp_max = max(temps) if temps else None
# Get weather for the middle of the day
middle_item = items[len(items) // 2] if items else {}
weather = middle_item.get("weather", [{}])[0] if "weather" in middle_item else {}
daily.append({
"dt": day_key,
"sunrise": None, # Not provided in this API
"sunset": None, # Not provided in this API
"temp": {
"day": None, # Not directly provided
"min": temp_min,
"max": temp_max,
"night": None, # Not directly provided
"eve": None, # Not directly provided
"morn": None # Not directly provided
},
"weather": {
"id": weather.get("id"),
"main": weather.get("main"),
"description": weather.get("description"),
"icon": weather.get("icon")
},
"hourly": [self._process_openweathermap_hourly(item) for item in items]
})
return {
"location": {
"name": city.get("name", ""),
"country": city.get("country", ""),
"lat": city.get("coord", {}).get("lat"),
"lon": city.get("coord", {}).get("lon"),
"timezone": city.get("timezone", 0)
},
"daily": daily
}
def _process_openweathermap_hourly(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""Process OpenWeatherMap hourly forecast item
Args:
item: Hourly forecast item
Returns:
Processed hourly forecast
"""
weather = item.get("weather", [{}])[0] if "weather" in item else {}
main = item.get("main", {})
wind = item.get("wind", {})
clouds = item.get("clouds", {})
rain = item.get("rain", {})
snow = item.get("snow", {})
return {
"dt": datetime.fromtimestamp(item.get("dt", 0)).isoformat(),
"temp": main.get("temp"),
"feels_like": main.get("feels_like"),
"pressure": main.get("pressure"),
"humidity": main.get("humidity"),
"dew_point": None, # Not provided in this API
"uvi": None, # Not provided in this API
"clouds": clouds.get("all"),
"visibility": item.get("visibility"),
"wind_speed": wind.get("speed"),
"wind_deg": wind.get("deg"),
"wind_gust": wind.get("gust"),
"weather": {
"id": weather.get("id"),
"main": weather.get("main"),
"description": weather.get("description"),
"icon": weather.get("icon")
},
"pop": item.get("pop"), # Probability of precipitation
"rain": rain.get("3h"),
"snow": snow.get("3h")
}
def _process_openweathermap_onecall(self, data: Dict[str, Any], days: int) -> Dict[str, Any]:
"""Process OpenWeatherMap OneCall API data
Args:
data: Raw API response data
days: Number of days to forecast
Returns:
Processed forecast data
"""
daily_items = data.get("daily", [])[:days]
hourly_items = data.get("hourly", [])
# Group hourly items by day
hourly_by_day = {}
for item in hourly_items:
dt = datetime.fromtimestamp(item.get("dt", 0))
day_key = dt.strftime("%Y-%m-%d")
if day_key not in hourly_by_day:
hourly_by_day[day_key] = []
hourly_by_day[day_key].append(item)
# Process daily forecasts
daily = []
for item in daily_items:
dt = datetime.fromtimestamp(item.get("dt", 0))
day_key = dt.strftime("%Y-%m-%d")
weather = item.get("weather", [{}])[0] if "weather" in item else {}
temp = item.get("temp", {})
daily_data = {
"dt": day_key,
"sunrise": datetime.fromtimestamp(item.get("sunrise", 0)).isoformat() if item.get("sunrise") else None,
"sunset": datetime.fromtimestamp(item.get("sunset", 0)).isoformat() if item.get("sunset") else None,
"temp": {
"day": temp.get("day"),
"min": temp.get("min"),
"max": temp.get("max"),
"night": temp.get("night"),
"eve": temp.get("eve"),
"morn": temp.get("morn")
},
"feels_like": item.get("feels_like"),
"pressure": item.get("pressure"),
"humidity": item.get("humidity"),
"dew_point": item.get("dew_point"),
"wind_speed": item.get("wind_speed"),
"wind_deg": item.get("wind_deg"),
"wind_gust": item.get("wind_gust"),
"weather": {
"id": weather.get("id"),
"main": weather.get("main"),
"description": weather.get("description"),
"icon": weather.get("icon")
},
"clouds": item.get("clouds"),
"pop": item.get("pop"),
"rain": item.get("rain"),
"snow": item.get("snow"),
"uvi": item.get("uvi")
}
# Add hourly data if available
if day_key in hourly_by_day:
daily_data["hourly"] = [self._process_openweathermap_onecall_hourly(h) for h in hourly_by_day[day_key]]
daily.append(daily_data)
return {
"location": {
"lat": data.get("lat"),
"lon": data.get("lon"),
"timezone": data.get("timezone"),
"timezone_offset": data.get("timezone_offset")
},
"current": self._process_openweathermap_onecall_current(data.get("current", {})) if "current" in data else None,
"daily": daily
}
def _process_openweathermap_onecall_current(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""Process OpenWeatherMap OneCall current weather data
Args:
item: Current weather item
Returns:
Processed current weather
"""
weather = item.get("weather", [{}])[0] if "weather" in item else {}
return {
"timestamp": datetime.fromtimestamp(item.get("dt", 0)).isoformat(),
"sunrise": datetime.fromtimestamp(item.get("sunrise", 0)).isoformat() if item.get("sunrise") else None,
"sunset": datetime.fromtimestamp(item.get("sunset", 0)).isoformat() if item.get("sunset") else None,
"temp": item.get("temp"),
"feels_like": item.get("feels_like"),
"pressure": item.get("pressure"),
"humidity": item.get("humidity"),
"dew_point": item.get("dew_point"),
"uvi": item.get("uvi"),
"clouds": item.get("clouds"),
"visibility": item.get("visibility"),
"wind_speed": item.get("wind_speed"),
"wind_deg": item.get("wind_deg"),
"wind_gust": item.get("wind_gust"),
"weather": {
"id": weather.get("id"),
"main": weather.get("main"),
"description": weather.get("description"),
"icon": weather.get("icon")
},
"rain_1h": item.get("rain", {}).get("1h") if "rain" in item else None,
"snow_1h": item.get("snow", {}).get("1h") if "snow" in item else None
}
def _process_openweathermap_onecall_hourly(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""Process OpenWeatherMap OneCall hourly forecast item
Args:
item: Hourly forecast item
Returns:
Processed hourly forecast
"""
weather = item.get("weather", [{}])[0] if "weather" in item else {}
return {
"dt": datetime.fromtimestamp(item.get("dt", 0)).isoformat(),
"temp": item.get("temp"),
"feels_like": item.get("feels_like"),
"pressure": item.get("pressure"),
"humidity": item.get("humidity"),
"dew_point": item.get("dew_point"),
"uvi": item.get("uvi"),
"clouds": item.get("clouds"),
"visibility": item.get("visibility"),
"wind_speed": item.get("wind_speed"),
"wind_deg": item.get("wind_deg"),
"wind_gust": item.get("wind_gust"),
"weather": {
"id": weather.get("id"),
"main": weather.get("main"),
"description": weather.get("description"),
"icon": weather.get("icon")
},
"pop": item.get("pop"),
"rain_1h": item.get("rain", {}).get("1h") if "rain" in item else None,
"snow_1h": item.get("snow", {}).get("1h") if "snow" in item else None
}
def _process_weatherapi_current(self, data: Dict[str, Any], units: str) -> Dict[str, Any]:
"""Process WeatherAPI current weather data
Args:
data: Raw API response data
units: Units of measurement
Returns:
Processed weather data
"""
location = data.get("location", {})
current = data.get("current", {})
condition = current.get("condition", {})
air_quality = current.get("air_quality", {})
# Convert units if needed
temp_field = "temp_c" if units == "metric" else "temp_f"
feelslike_field = "feelslike_c" if units == "metric" else "feelslike_f"
wind_speed_field = "wind_kph" if units == "metric" else "wind_mph"
wind_gust_field = "gust_kph" if units == "metric" else "gust_mph"
vis_field = "vis_km" if units == "metric" else "vis_miles"
precip_field = "precip_mm" if units == "metric" else "precip_in"
# Process air quality data if available
air_quality_data = None
if air_quality:
# Calculate US EPA index if not provided
us_epa_index = air_quality.get("us-epa-index", 0)
# Map EPA index to category
epa_categories = ["Good", "Moderate", "Unhealthy for Sensitive Groups", "Unhealthy", "Very Unhealthy", "Hazardous"]
epa_category = epa_categories[us_epa_index - 1] if 1 <= us_epa_index <= 6 else "Unknown"
air_quality_data = {
"aqi": us_epa_index,
"category": epa_category,
"components": {
"co": air_quality.get("co"),
"no2": air_quality.get("no2"),
"o3": air_quality.get("o3"),
"so2": air_quality.get("so2"),
"pm2_5": air_quality.get("pm2_5"),
"pm10": air_quality.get("pm10")
}
}
return {
"location": {
"name": location.get("name", ""),
"region": location.get("region", ""),
"country": location.get("country", ""),
"lat": location.get("lat"),
"lon": location.get("lon"),
"timezone": location.get("tz_id"),
"localtime": location.get("localtime"),
},
"current": {
"timestamp": datetime.fromtimestamp(current.get("last_updated_epoch", 0)).isoformat(),
"temp": current.get(temp_field),
"feels_like": current.get(feelslike_field),
"condition": {
"text": condition.get("text"),
"icon": condition.get("icon"),
"code": condition.get("code")
},
"wind_speed": current.get(wind_speed_field),
"wind_deg": current.get("wind_degree"),
"wind_dir": current.get("wind_dir"),
"wind_gust": current.get(wind_gust_field),
"pressure": current.get("pressure_mb"),
"humidity": current.get("humidity"),
"cloud": current.get("cloud"),
"visibility": current.get(vis_field),
"precipitation": current.get(precip_field),
"uv": current.get("uv")
},
"air_quality": air_quality_data
}
def _process_weatherapi_forecast(self, data: Dict[str, Any], days: int, units: str) -> Dict[str, Any]:
"""Process WeatherAPI forecast data
Args:
data: Raw API response data
days: Number of days to forecast
units: Units of measurement
Returns:
Processed forecast data
"""
location = data.get("location", {})
forecast_days = data.get("forecast", {}).get("forecastday", [])[:days]
# Process daily forecasts
daily = []
for day_data in forecast_days:
day = day_data.get("day", {})
astro = day_data.get("astro", {})
condition = day.get("condition", {})
# Convert units if needed
temp_min_field = "mintemp_c" if units == "metric" else "mintemp_f"
temp_max_field = "maxtemp_c" if units == "metric" else "maxtemp_f"
avg_temp_field = "avgtemp_c" if units == "metric" else "avgtemp_f"
wind_speed_field = "maxwind_kph" if units == "metric" else "maxwind_mph"
vis_field = "avgvis_km" if units == "metric" else "avgvis_miles"
precip_field = "totalprecip_mm" if units == "metric" else "totalprecip_in"
# Process hourly data
hourly = []
for hour_data in day_data.get("hour", []):
hour_condition = hour_data.get("condition", {})
# Convert units for hourly data
hour_temp_field = "temp_c" if units == "metric" else "temp_f"
hour_feelslike_field = "feelslike_c" if units == "metric" else "feelslike_f"
hour_wind_speed_field = "wind_kph" if units == "metric" else "wind_mph"
hour_wind_gust_field = "gust_kph" if units == "metric" else "gust_mph"
hour_vis_field = "vis_km" if units == "metric" else "vis_miles"
hour_precip_field = "precip_mm" if units == "metric" else "precip_in"
hourly.append({
"time": hour_data.get("time"),
"temp": hour_data.get(hour_temp_field),
"feels_like": hour_data.get(hour_feelslike_field),
"condition": {
"text": hour_condition.get("text"),
"icon": hour_condition.get("icon"),
"code": hour_condition.get("code")
},
"wind_speed": hour_data.get(hour_wind_speed_field),
"wind_deg": hour_data.get("wind_degree"),
"wind_dir": hour_data.get("wind_dir"),
"wind_gust": hour_data.get(hour_wind_gust_field),
"pressure": hour_data.get("pressure_mb"),
"humidity": hour_data.get("humidity"),
"cloud": hour_data.get("cloud"),
"visibility": hour_data.get(hour_vis_field),
"precipitation": hour_data.get(hour_precip_field),
"chance_of_rain": hour_data.get("chance_of_rain"),
"chance_of_snow": hour_data.get("chance_of_snow"),
"uv": hour_data.get("uv")
})
daily.append({
"date": day_data.get("date"),
"temp_min": day.get(temp_min_field),
"temp_max": day.get(temp_max_field),
"temp_avg": day.get(avg_temp_field),
"condition": {
"text": condition.get("text"),
"icon": condition.get("icon"),
"code": condition.get("code")
},
"wind_speed": day.get(wind_speed_field),
"wind_deg": None, # Not provided in day summary
"wind_dir": None, # Not provided in day summary
"humidity": day.get("avghumidity"),
"visibility": day.get(vis_field),
"precipitation": day.get(precip_field),
"chance_of_rain": day.get("daily_chance_of_rain"),
"chance_of_snow": day.get("daily_chance_of_snow"),
"uv": day.get("uv"),
"sunrise": astro.get("sunrise"),
"sunset": astro.get("sunset"),
"moonrise": astro.get("moonrise"),
"moonset": astro.get("moonset"),
"moon_phase": astro.get("moon_phase"),
"hourly": hourly
})
return {
"location": {
"name": location.get("name", ""),
"region": location.get("region", ""),
"country": location.get("country", ""),
"lat": location.get("lat"),
"lon": location.get("lon"),
"timezone": location.get("tz_id"),
"localtime": location.get("localtime"),
},
"current": self._process_weatherapi_current(data, units)["current"] if "current" in data else None,
"daily": daily,
"alerts": data.get("alerts", {}).get("alert", [])
}