Spaces:
Sleeping
Sleeping
| import datetime | |
| import pandas as pd | |
| from xgboost import XGBRegressor | |
| import hopsworks | |
| import json | |
| from functions import util | |
| import os | |
| # Set up | |
| api_key = os.getenv('HOPSWORKS_API_KEY') | |
| project_name = os.getenv('HOPSWORKS_PROJECT') | |
| project = hopsworks.login(project=project_name, api_key_value=api_key) | |
| fs = project.get_feature_store() | |
| secrets = util.secrets_api(project.name) | |
| location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value | |
| location = json.loads(location_str) | |
| country=location['country'] | |
| city=location['city'] | |
| street=location['street'] | |
| AQI_API_KEY = secrets.get_secret("AQI_API_KEY").value | |
| location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value | |
| location = json.loads(location_str) | |
| today = datetime.datetime.now() - datetime.timedelta(0) | |
| feature_view = fs.get_feature_view( | |
| name='air_quality_fv', | |
| version=1, | |
| ) | |
| ### Retreive model | |
| mr = project.get_model_registry() | |
| retrieved_model = mr.get_model( | |
| name="air_quality_xgboost_model", | |
| version=1, | |
| ) | |
| saved_model_dir = retrieved_model.download() | |
| retrieved_xgboost_model = XGBRegressor() | |
| retrieved_xgboost_model.load_model(saved_model_dir + "/model.json") | |
| ### Retrieve features | |
| weather_fg = fs.get_feature_group( | |
| name='weather', | |
| version=1, | |
| ) | |
| today_timestamp = pd.to_datetime(today) | |
| batch_data = weather_fg.filter(weather_fg.date >= today_timestamp ).read() | |
| ### Predict and upload | |
| batch_data['predicted_pm25'] = retrieved_xgboost_model.predict( | |
| batch_data[['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']]) | |
| batch_data['street'] = street | |
| batch_data['city'] = city | |
| batch_data['country'] = country | |
| # Fill in the number of days before the date on which you made the forecast (base_date) | |
| batch_data['days_before_forecast_day'] = range(1, len(batch_data)+1) | |
| batch_data = batch_data.sort_values(by=['date']) | |
| #batch_data['date'] = batch_data['date'].dt.tz_convert(None).astype('datetime64[ns]') | |
| plt = util.plot_air_quality_forecast(city, street, batch_data, file_path="./img/pm25_forecast.png") | |
| monitor_fg = fs.get_or_create_feature_group( | |
| name='aq_predictions', | |
| description='Air Quality prediction monitoring', | |
| version=1, | |
| primary_key=['city','street','date','days_before_forecast_day'], | |
| event_time="date" | |
| ) | |
| print(f"Batch data: {batch_data}") | |
| monitor_fg.insert(batch_data, write_options={"wait_for_job": True}) | |
| monitoring_df = monitor_fg.filter(monitor_fg.days_before_forecast_day == 1).read() | |
| # Hindcast monitoring | |
| air_quality_fg = fs.get_feature_group( | |
| name='air_quality', | |
| version=1, | |
| ) | |
| air_quality_df = air_quality_fg.read() | |
| outcome_df = air_quality_df[['date', 'pm25']] | |
| preds_df = monitoring_df[['date', 'predicted_pm25']] | |
| hindcast_df = pd.merge(preds_df, outcome_df, on="date") | |
| hindcast_df = hindcast_df.sort_values(by=['date']) | |
| if len(hindcast_df) == 0: | |
| hindcast_df = util.backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, retrieved_xgboost_model) | |
| plt = util.plot_air_quality_forecast(city, street, hindcast_df, file_path="./img/pm25_hindcast_1day.png", hindcast=True) |