import requests import json from typing import Dict, List, Any, Optional, Union, Tuple from datetime import datetime, timedelta import time from utils.logging import setup_logger from utils.error_handling import handle_exceptions, IntegrationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class WeatherIntegration: """Weather API integration for environmental data""" def __init__(self, api_key: Optional[str] = None, provider: str = "openweathermap"): """Initialize Weather API integration Args: api_key: API key for the weather provider (optional) provider: Weather data provider (default: openweathermap) """ # If no API key is provided, try to get it from settings if api_key is None: from utils.integrations import get_api_key api_key = get_api_key("OpenWeatherMap") self.api_key = api_key self.provider = provider.lower() self.cache = {} self.cache_expiry = {} # Set up provider-specific configurations self.providers = { "openweathermap": { "current_url": "https://api.openweathermap.org/data/2.5/weather", "forecast_url": "https://api.openweathermap.org/data/2.5/forecast", "onecall_url": "https://api.openweathermap.org/data/3.0/onecall", "geocoding_url": "https://api.openweathermap.org/geo/1.0/direct", "cache_duration": 1800 # 30 minutes }, "weatherapi": { "base_url": "https://api.weatherapi.com/v1", "current_url": "https://api.weatherapi.com/v1/current.json", "forecast_url": "https://api.weatherapi.com/v1/forecast.json", "search_url": "https://api.weatherapi.com/v1/search.json", "cache_duration": 1800 # 30 minutes } } @handle_exceptions def set_api_key(self, api_key: str) -> None: """Set API key for the weather provider Args: api_key: API key """ self.api_key = api_key # Clear cache when API key changes self.cache = {} self.cache_expiry = {} @handle_exceptions def set_provider(self, provider: str) -> None: """Set weather data provider Args: provider: Weather data provider """ provider = provider.lower() if provider not in self.providers: raise IntegrationError(f"Unsupported weather provider: {provider}") self.provider = provider # Clear cache when provider changes self.cache = {} self.cache_expiry = {} @handle_exceptions def test_connection(self) -> bool: """Test weather API connection Returns: True if connection is successful, False otherwise """ if not self.api_key: logger.error("Weather API key not set") return False try: import requests # Import here to avoid circular imports if self.provider == "openweathermap": # Test with a simple geocoding request params = { "q": "London", "limit": 1, "appid": self.api_key } response = requests.get(self.providers[self.provider]["geocoding_url"], params=params) # Check if the request was successful if response.status_code == 200: return True else: logger.error(f"API test failed with status code {response.status_code}: {response.text}") return False elif self.provider == "weatherapi": # Test with a simple search request params = { "key": self.api_key, "q": "London" } response = requests.get(self.providers[self.provider]["search_url"], params=params) return response.status_code == 200 except Exception as e: logger.error(f"Weather API connection test failed: {str(e)}") return False @handle_exceptions def get_current_weather(self, location: str, units: str = "metric") -> Dict[str, Any]: """Get current weather for a location Args: location: Location name or coordinates (lat,lon) units: Units of measurement (metric, imperial, standard) Returns: Current weather data """ if not self.api_key: raise IntegrationError("Weather API key not set") # Check cache cache_key = f"current_{location}_{units}_{self.provider}" if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): return self.cache[cache_key] try: if self.provider == "openweathermap": # Check if location is coordinates if "," in location and all(part.replace(".", "").replace("-", "").isdigit() for part in location.split(",")): lat, lon = location.split(",") params = { "lat": lat.strip(), "lon": lon.strip(), "appid": self.api_key, "units": units } else: params = { "q": location, "appid": self.api_key, "units": units } response = requests.get(self.providers[self.provider]["current_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get weather data: {response.text}") data = response.json() # Process data into a standardized format weather_data = self._process_openweathermap_current(data) elif self.provider == "weatherapi": params = { "key": self.api_key, "q": location, "aqi": "yes" # Include air quality data } response = requests.get(self.providers[self.provider]["current_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get weather data: {response.text}") data = response.json() # Process data into a standardized format weather_data = self._process_weatherapi_current(data, units) else: raise IntegrationError(f"Unsupported weather provider: {self.provider}") # Cache the result self.cache[cache_key] = weather_data self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] return weather_data except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to get weather data: {str(e)}") raise IntegrationError(f"Failed to get weather data: {str(e)}") raise @handle_exceptions def get_weather_forecast(self, location: str, days: int = 5, units: str = "metric") -> Dict[str, Any]: """Get weather forecast for a location Args: location: Location name or coordinates (lat,lon) days: Number of days to forecast (default: 5) units: Units of measurement (metric, imperial, standard) Returns: Weather forecast data """ if not self.api_key: raise IntegrationError("Weather API key not set") # Check cache cache_key = f"forecast_{location}_{days}_{units}_{self.provider}" if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): return self.cache[cache_key] try: if self.provider == "openweathermap": # For OpenWeatherMap, we need to get coordinates first if location is a name coords = self._get_coordinates(location) # Use OneCall API for better forecast data params = { "lat": coords[0], "lon": coords[1], "appid": self.api_key, "units": units, "exclude": "minutely,alerts" } response = requests.get(self.providers[self.provider]["onecall_url"], params=params) if response.status_code != 200: # Fall back to standard forecast API if "," in location and all(part.replace(".", "").replace("-", "").isdigit() for part in location.split(",")): lat, lon = location.split(",") params = { "lat": lat.strip(), "lon": lon.strip(), "appid": self.api_key, "units": units, "cnt": min(days * 8, 40) # 3-hour steps, max 5 days (40 steps) } else: params = { "q": location, "appid": self.api_key, "units": units, "cnt": min(days * 8, 40) # 3-hour steps, max 5 days (40 steps) } response = requests.get(self.providers[self.provider]["forecast_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get forecast data: {response.text}") data = response.json() forecast_data = self._process_openweathermap_forecast(data, days) else: data = response.json() forecast_data = self._process_openweathermap_onecall(data, days) elif self.provider == "weatherapi": params = { "key": self.api_key, "q": location, "days": min(days, 14), # WeatherAPI supports up to 14 days "aqi": "yes", # Include air quality data "alerts": "yes" # Include weather alerts } response = requests.get(self.providers[self.provider]["forecast_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get forecast data: {response.text}") data = response.json() forecast_data = self._process_weatherapi_forecast(data, days, units) else: raise IntegrationError(f"Unsupported weather provider: {self.provider}") # Cache the result self.cache[cache_key] = forecast_data self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] return forecast_data except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to get forecast data: {str(e)}") raise IntegrationError(f"Failed to get forecast data: {str(e)}") raise @handle_exceptions def search_locations(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: """Search for locations Args: query: Search query limit: Maximum number of results (default: 5) Returns: List of location data """ if not self.api_key: raise IntegrationError("Weather API key not set") try: if self.provider == "openweathermap": params = { "q": query, "limit": limit, "appid": self.api_key } response = requests.get(self.providers[self.provider]["geocoding_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to search locations: {response.text}") data = response.json() # Process data into a standardized format locations = [] for item in data: locations.append({ "name": item.get("name", ""), "country": item.get("country", ""), "state": item.get("state", ""), "lat": item.get("lat"), "lon": item.get("lon"), "id": f"{item.get('lat')},{item.get('lon')}" }) elif self.provider == "weatherapi": params = { "key": self.api_key, "q": query } response = requests.get(self.providers[self.provider]["search_url"], params=params) if response.status_code != 200: raise IntegrationError(f"Failed to search locations: {response.text}") data = response.json() # Process data into a standardized format locations = [] for item in data[:limit]: locations.append({ "name": item.get("name", ""), "country": item.get("country", ""), "region": item.get("region", ""), "lat": item.get("lat"), "lon": item.get("lon"), "id": f"{item.get('lat')},{item.get('lon')}" }) else: raise IntegrationError(f"Unsupported weather provider: {self.provider}") return locations except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to search locations: {str(e)}") raise IntegrationError(f"Failed to search locations: {str(e)}") raise @handle_exceptions def get_air_quality(self, location: str) -> Dict[str, Any]: """Get air quality data for a location Args: location: Location name or coordinates (lat,lon) Returns: Air quality data """ if not self.api_key: raise IntegrationError("Weather API key not set") # Check cache cache_key = f"air_{location}_{self.provider}" if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): return self.cache[cache_key] try: if self.provider == "openweathermap": # Get coordinates if location is a name coords = self._get_coordinates(location) # Use Air Pollution API params = { "lat": coords[0], "lon": coords[1], "appid": self.api_key } response = requests.get("https://api.openweathermap.org/data/2.5/air_pollution", params=params) if response.status_code != 200: raise IntegrationError(f"Failed to get air quality data: {response.text}") data = response.json() # Process data into a standardized format if "list" in data and data["list"]: air_data = data["list"][0] components = air_data.get("components", {}) aqi = air_data.get("main", {}).get("aqi", 0) # Map AQI value to category aqi_categories = ["Good", "Fair", "Moderate", "Poor", "Very Poor"] aqi_category = aqi_categories[aqi - 1] if 1 <= aqi <= 5 else "Unknown" air_quality = { "aqi": aqi, "category": aqi_category, "components": { "co": components.get("co"), # Carbon monoxide (μg/m3) "no": components.get("no"), # Nitrogen monoxide (μg/m3) "no2": components.get("no2"), # Nitrogen dioxide (μg/m3) "o3": components.get("o3"), # Ozone (μg/m3) "so2": components.get("so2"), # Sulphur dioxide (μg/m3) "pm2_5": components.get("pm2_5"), # Fine particles (μg/m3) "pm10": components.get("pm10"), # Coarse particles (μg/m3) "nh3": components.get("nh3") # Ammonia (μg/m3) }, "timestamp": datetime.fromtimestamp(air_data.get("dt", 0)).isoformat() } else: raise IntegrationError("No air quality data available") elif self.provider == "weatherapi": # WeatherAPI includes air quality in current weather weather_data = self.get_current_weather(location) if "air_quality" in weather_data: air_quality = weather_data["air_quality"] else: raise IntegrationError("No air quality data available") else: raise IntegrationError(f"Unsupported weather provider: {self.provider}") # Cache the result self.cache[cache_key] = air_quality self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] return air_quality except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to get air quality data: {str(e)}") raise IntegrationError(f"Failed to get air quality data: {str(e)}") raise def _get_coordinates(self, location: str) -> Tuple[float, float]: """Get coordinates for a location name Args: location: Location name or coordinates Returns: Tuple of (latitude, longitude) """ # Check if location is already coordinates if "," in location and all(part.replace(".", "").replace("-", "").isdigit() for part in location.split(",")): lat, lon = location.split(",") return float(lat.strip()), float(lon.strip()) # Search for location locations = self.search_locations(location, 1) if not locations: raise IntegrationError(f"Location not found: {location}") return locations[0]["lat"], locations[0]["lon"] def _process_openweathermap_current(self, data: Dict[str, Any]) -> Dict[str, Any]: """Process OpenWeatherMap current weather data Args: data: Raw API response data Returns: Processed weather data """ weather = data.get("weather", [{}])[0] main = data.get("main", {}) wind = data.get("wind", {}) clouds = data.get("clouds", {}) rain = data.get("rain", {}) snow = data.get("snow", {}) sys = data.get("sys", {}) return { "location": { "name": data.get("name", ""), "country": sys.get("country", ""), "lat": data.get("coord", {}).get("lat"), "lon": data.get("coord", {}).get("lon"), "timezone": data.get("timezone", 0), "sunrise": datetime.fromtimestamp(sys.get("sunrise", 0)).isoformat() if sys.get("sunrise") else None, "sunset": datetime.fromtimestamp(sys.get("sunset", 0)).isoformat() if sys.get("sunset") else None }, "current": { "timestamp": datetime.fromtimestamp(data.get("dt", 0)).isoformat(), "temp": main.get("temp"), "feels_like": main.get("feels_like"), "pressure": main.get("pressure"), "humidity": main.get("humidity"), "dew_point": None, # Not provided in this API "uvi": None, # Not provided in this API "clouds": clouds.get("all"), "visibility": data.get("visibility"), "wind_speed": wind.get("speed"), "wind_deg": wind.get("deg"), "wind_gust": wind.get("gust"), "weather": { "id": weather.get("id"), "main": weather.get("main"), "description": weather.get("description"), "icon": weather.get("icon") }, "rain_1h": rain.get("1h"), "snow_1h": snow.get("1h") } } def _process_openweathermap_forecast(self, data: Dict[str, Any], days: int) -> Dict[str, Any]: """Process OpenWeatherMap forecast data Args: data: Raw API response data days: Number of days to forecast Returns: Processed forecast data """ city = data.get("city", {}) forecast_items = data.get("list", []) # Group forecast items by day daily_forecasts = {} for item in forecast_items: dt = datetime.fromtimestamp(item.get("dt", 0)) day_key = dt.strftime("%Y-%m-%d") if day_key not in daily_forecasts: daily_forecasts[day_key] = [] daily_forecasts[day_key].append(item) # Process daily forecasts daily = [] for day_key in sorted(daily_forecasts.keys())[:days]: items = daily_forecasts[day_key] # Calculate daily min/max temperatures temps = [item.get("main", {}).get("temp") for item in items if "main" in item] temp_min = min(temps) if temps else None temp_max = max(temps) if temps else None # Get weather for the middle of the day middle_item = items[len(items) // 2] if items else {} weather = middle_item.get("weather", [{}])[0] if "weather" in middle_item else {} daily.append({ "dt": day_key, "sunrise": None, # Not provided in this API "sunset": None, # Not provided in this API "temp": { "day": None, # Not directly provided "min": temp_min, "max": temp_max, "night": None, # Not directly provided "eve": None, # Not directly provided "morn": None # Not directly provided }, "weather": { "id": weather.get("id"), "main": weather.get("main"), "description": weather.get("description"), "icon": weather.get("icon") }, "hourly": [self._process_openweathermap_hourly(item) for item in items] }) return { "location": { "name": city.get("name", ""), "country": city.get("country", ""), "lat": city.get("coord", {}).get("lat"), "lon": city.get("coord", {}).get("lon"), "timezone": city.get("timezone", 0) }, "daily": daily } def _process_openweathermap_hourly(self, item: Dict[str, Any]) -> Dict[str, Any]: """Process OpenWeatherMap hourly forecast item Args: item: Hourly forecast item Returns: Processed hourly forecast """ weather = item.get("weather", [{}])[0] if "weather" in item else {} main = item.get("main", {}) wind = item.get("wind", {}) clouds = item.get("clouds", {}) rain = item.get("rain", {}) snow = item.get("snow", {}) return { "dt": datetime.fromtimestamp(item.get("dt", 0)).isoformat(), "temp": main.get("temp"), "feels_like": main.get("feels_like"), "pressure": main.get("pressure"), "humidity": main.get("humidity"), "dew_point": None, # Not provided in this API "uvi": None, # Not provided in this API "clouds": clouds.get("all"), "visibility": item.get("visibility"), "wind_speed": wind.get("speed"), "wind_deg": wind.get("deg"), "wind_gust": wind.get("gust"), "weather": { "id": weather.get("id"), "main": weather.get("main"), "description": weather.get("description"), "icon": weather.get("icon") }, "pop": item.get("pop"), # Probability of precipitation "rain": rain.get("3h"), "snow": snow.get("3h") } def _process_openweathermap_onecall(self, data: Dict[str, Any], days: int) -> Dict[str, Any]: """Process OpenWeatherMap OneCall API data Args: data: Raw API response data days: Number of days to forecast Returns: Processed forecast data """ daily_items = data.get("daily", [])[:days] hourly_items = data.get("hourly", []) # Group hourly items by day hourly_by_day = {} for item in hourly_items: dt = datetime.fromtimestamp(item.get("dt", 0)) day_key = dt.strftime("%Y-%m-%d") if day_key not in hourly_by_day: hourly_by_day[day_key] = [] hourly_by_day[day_key].append(item) # Process daily forecasts daily = [] for item in daily_items: dt = datetime.fromtimestamp(item.get("dt", 0)) day_key = dt.strftime("%Y-%m-%d") weather = item.get("weather", [{}])[0] if "weather" in item else {} temp = item.get("temp", {}) daily_data = { "dt": day_key, "sunrise": datetime.fromtimestamp(item.get("sunrise", 0)).isoformat() if item.get("sunrise") else None, "sunset": datetime.fromtimestamp(item.get("sunset", 0)).isoformat() if item.get("sunset") else None, "temp": { "day": temp.get("day"), "min": temp.get("min"), "max": temp.get("max"), "night": temp.get("night"), "eve": temp.get("eve"), "morn": temp.get("morn") }, "feels_like": item.get("feels_like"), "pressure": item.get("pressure"), "humidity": item.get("humidity"), "dew_point": item.get("dew_point"), "wind_speed": item.get("wind_speed"), "wind_deg": item.get("wind_deg"), "wind_gust": item.get("wind_gust"), "weather": { "id": weather.get("id"), "main": weather.get("main"), "description": weather.get("description"), "icon": weather.get("icon") }, "clouds": item.get("clouds"), "pop": item.get("pop"), "rain": item.get("rain"), "snow": item.get("snow"), "uvi": item.get("uvi") } # Add hourly data if available if day_key in hourly_by_day: daily_data["hourly"] = [self._process_openweathermap_onecall_hourly(h) for h in hourly_by_day[day_key]] daily.append(daily_data) return { "location": { "lat": data.get("lat"), "lon": data.get("lon"), "timezone": data.get("timezone"), "timezone_offset": data.get("timezone_offset") }, "current": self._process_openweathermap_onecall_current(data.get("current", {})) if "current" in data else None, "daily": daily } def _process_openweathermap_onecall_current(self, item: Dict[str, Any]) -> Dict[str, Any]: """Process OpenWeatherMap OneCall current weather data Args: item: Current weather item Returns: Processed current weather """ weather = item.get("weather", [{}])[0] if "weather" in item else {} return { "timestamp": datetime.fromtimestamp(item.get("dt", 0)).isoformat(), "sunrise": datetime.fromtimestamp(item.get("sunrise", 0)).isoformat() if item.get("sunrise") else None, "sunset": datetime.fromtimestamp(item.get("sunset", 0)).isoformat() if item.get("sunset") else None, "temp": item.get("temp"), "feels_like": item.get("feels_like"), "pressure": item.get("pressure"), "humidity": item.get("humidity"), "dew_point": item.get("dew_point"), "uvi": item.get("uvi"), "clouds": item.get("clouds"), "visibility": item.get("visibility"), "wind_speed": item.get("wind_speed"), "wind_deg": item.get("wind_deg"), "wind_gust": item.get("wind_gust"), "weather": { "id": weather.get("id"), "main": weather.get("main"), "description": weather.get("description"), "icon": weather.get("icon") }, "rain_1h": item.get("rain", {}).get("1h") if "rain" in item else None, "snow_1h": item.get("snow", {}).get("1h") if "snow" in item else None } def _process_openweathermap_onecall_hourly(self, item: Dict[str, Any]) -> Dict[str, Any]: """Process OpenWeatherMap OneCall hourly forecast item Args: item: Hourly forecast item Returns: Processed hourly forecast """ weather = item.get("weather", [{}])[0] if "weather" in item else {} return { "dt": datetime.fromtimestamp(item.get("dt", 0)).isoformat(), "temp": item.get("temp"), "feels_like": item.get("feels_like"), "pressure": item.get("pressure"), "humidity": item.get("humidity"), "dew_point": item.get("dew_point"), "uvi": item.get("uvi"), "clouds": item.get("clouds"), "visibility": item.get("visibility"), "wind_speed": item.get("wind_speed"), "wind_deg": item.get("wind_deg"), "wind_gust": item.get("wind_gust"), "weather": { "id": weather.get("id"), "main": weather.get("main"), "description": weather.get("description"), "icon": weather.get("icon") }, "pop": item.get("pop"), "rain_1h": item.get("rain", {}).get("1h") if "rain" in item else None, "snow_1h": item.get("snow", {}).get("1h") if "snow" in item else None } def _process_weatherapi_current(self, data: Dict[str, Any], units: str) -> Dict[str, Any]: """Process WeatherAPI current weather data Args: data: Raw API response data units: Units of measurement Returns: Processed weather data """ location = data.get("location", {}) current = data.get("current", {}) condition = current.get("condition", {}) air_quality = current.get("air_quality", {}) # Convert units if needed temp_field = "temp_c" if units == "metric" else "temp_f" feelslike_field = "feelslike_c" if units == "metric" else "feelslike_f" wind_speed_field = "wind_kph" if units == "metric" else "wind_mph" wind_gust_field = "gust_kph" if units == "metric" else "gust_mph" vis_field = "vis_km" if units == "metric" else "vis_miles" precip_field = "precip_mm" if units == "metric" else "precip_in" # Process air quality data if available air_quality_data = None if air_quality: # Calculate US EPA index if not provided us_epa_index = air_quality.get("us-epa-index", 0) # Map EPA index to category epa_categories = ["Good", "Moderate", "Unhealthy for Sensitive Groups", "Unhealthy", "Very Unhealthy", "Hazardous"] epa_category = epa_categories[us_epa_index - 1] if 1 <= us_epa_index <= 6 else "Unknown" air_quality_data = { "aqi": us_epa_index, "category": epa_category, "components": { "co": air_quality.get("co"), "no2": air_quality.get("no2"), "o3": air_quality.get("o3"), "so2": air_quality.get("so2"), "pm2_5": air_quality.get("pm2_5"), "pm10": air_quality.get("pm10") } } return { "location": { "name": location.get("name", ""), "region": location.get("region", ""), "country": location.get("country", ""), "lat": location.get("lat"), "lon": location.get("lon"), "timezone": location.get("tz_id"), "localtime": location.get("localtime"), }, "current": { "timestamp": datetime.fromtimestamp(current.get("last_updated_epoch", 0)).isoformat(), "temp": current.get(temp_field), "feels_like": current.get(feelslike_field), "condition": { "text": condition.get("text"), "icon": condition.get("icon"), "code": condition.get("code") }, "wind_speed": current.get(wind_speed_field), "wind_deg": current.get("wind_degree"), "wind_dir": current.get("wind_dir"), "wind_gust": current.get(wind_gust_field), "pressure": current.get("pressure_mb"), "humidity": current.get("humidity"), "cloud": current.get("cloud"), "visibility": current.get(vis_field), "precipitation": current.get(precip_field), "uv": current.get("uv") }, "air_quality": air_quality_data } def _process_weatherapi_forecast(self, data: Dict[str, Any], days: int, units: str) -> Dict[str, Any]: """Process WeatherAPI forecast data Args: data: Raw API response data days: Number of days to forecast units: Units of measurement Returns: Processed forecast data """ location = data.get("location", {}) forecast_days = data.get("forecast", {}).get("forecastday", [])[:days] # Process daily forecasts daily = [] for day_data in forecast_days: day = day_data.get("day", {}) astro = day_data.get("astro", {}) condition = day.get("condition", {}) # Convert units if needed temp_min_field = "mintemp_c" if units == "metric" else "mintemp_f" temp_max_field = "maxtemp_c" if units == "metric" else "maxtemp_f" avg_temp_field = "avgtemp_c" if units == "metric" else "avgtemp_f" wind_speed_field = "maxwind_kph" if units == "metric" else "maxwind_mph" vis_field = "avgvis_km" if units == "metric" else "avgvis_miles" precip_field = "totalprecip_mm" if units == "metric" else "totalprecip_in" # Process hourly data hourly = [] for hour_data in day_data.get("hour", []): hour_condition = hour_data.get("condition", {}) # Convert units for hourly data hour_temp_field = "temp_c" if units == "metric" else "temp_f" hour_feelslike_field = "feelslike_c" if units == "metric" else "feelslike_f" hour_wind_speed_field = "wind_kph" if units == "metric" else "wind_mph" hour_wind_gust_field = "gust_kph" if units == "metric" else "gust_mph" hour_vis_field = "vis_km" if units == "metric" else "vis_miles" hour_precip_field = "precip_mm" if units == "metric" else "precip_in" hourly.append({ "time": hour_data.get("time"), "temp": hour_data.get(hour_temp_field), "feels_like": hour_data.get(hour_feelslike_field), "condition": { "text": hour_condition.get("text"), "icon": hour_condition.get("icon"), "code": hour_condition.get("code") }, "wind_speed": hour_data.get(hour_wind_speed_field), "wind_deg": hour_data.get("wind_degree"), "wind_dir": hour_data.get("wind_dir"), "wind_gust": hour_data.get(hour_wind_gust_field), "pressure": hour_data.get("pressure_mb"), "humidity": hour_data.get("humidity"), "cloud": hour_data.get("cloud"), "visibility": hour_data.get(hour_vis_field), "precipitation": hour_data.get(hour_precip_field), "chance_of_rain": hour_data.get("chance_of_rain"), "chance_of_snow": hour_data.get("chance_of_snow"), "uv": hour_data.get("uv") }) daily.append({ "date": day_data.get("date"), "temp_min": day.get(temp_min_field), "temp_max": day.get(temp_max_field), "temp_avg": day.get(avg_temp_field), "condition": { "text": condition.get("text"), "icon": condition.get("icon"), "code": condition.get("code") }, "wind_speed": day.get(wind_speed_field), "wind_deg": None, # Not provided in day summary "wind_dir": None, # Not provided in day summary "humidity": day.get("avghumidity"), "visibility": day.get(vis_field), "precipitation": day.get(precip_field), "chance_of_rain": day.get("daily_chance_of_rain"), "chance_of_snow": day.get("daily_chance_of_snow"), "uv": day.get("uv"), "sunrise": astro.get("sunrise"), "sunset": astro.get("sunset"), "moonrise": astro.get("moonrise"), "moonset": astro.get("moonset"), "moon_phase": astro.get("moon_phase"), "hourly": hourly }) return { "location": { "name": location.get("name", ""), "region": location.get("region", ""), "country": location.get("country", ""), "lat": location.get("lat"), "lon": location.get("lon"), "timezone": location.get("tz_id"), "localtime": location.get("localtime"), }, "current": self._process_weatherapi_current(data, units)["current"] if "current" in data else None, "daily": daily, "alerts": data.get("alerts", {}).get("alert", []) }