Spaces:
Sleeping
Sleeping
File size: 4,688 Bytes
8cc5633 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# evaluate.py
import json
import logging
import os
import numpy as np
import pandas as pd
import torch
from momentfm.utils.utils import control_randomness
from sklearn.metrics import mean_squared_error, r2_score
from tqdm import tqdm
from transformer_model.scripts.config_transformer import (DATA_PATH,
FORECAST_HORIZON,
RESULTS_DIR, SEQ_LEN)
from transformer_model.scripts.utils.check_device import check_device
from transformer_model.scripts.utils.informer_dataset_class import \
InformerDataset
from transformer_model.scripts.utils.load_final_model import \
load_final_transformer_model
# Setup logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
def evaluate():
control_randomness(seed=13)
# Set device
device, backend, scaler = check_device()
logging.info(f"Evaluation is running on: {backend} ({device})")
# Load final model
model, _ = load_final_transformer_model(device)
# Recreate training dataset to get the fitted scaler
train_dataset = InformerDataset(
data_split="train", random_seed=13, forecast_horizon=FORECAST_HORIZON
)
# Use its scaler in the test dataset
test_dataset = InformerDataset(
data_split="test", random_seed=13, forecast_horizon=FORECAST_HORIZON
)
test_dataset.scaler = train_dataset.scaler
test_loader = torch.utils.data.DataLoader(
test_dataset, batch_size=32, shuffle=False
)
trues, preds = [], []
with torch.no_grad():
for timeseries, forecast, input_mask in tqdm(
test_loader, desc="Evaluating on test set"
):
timeseries = timeseries.float().to(device)
forecast = forecast.float().to(device)
input_mask = input_mask.to(device) # <- wichtig!
output = model(x_enc=timeseries, input_mask=input_mask)
trues.append(forecast.cpu().numpy())
preds.append(output.forecast.cpu().numpy())
trues = np.concatenate(trues, axis=0)
preds = np.concatenate(preds, axis=0)
# Extract only first feature (consumption)
true_values = trues[:, 0, :]
pred_values = preds[:, 0, :]
# Inverse normalization
n_features = test_dataset.n_channels
true_reshaped = np.column_stack(
[true_values.flatten()]
+ [np.zeros_like(true_values.flatten())] * (n_features - 1)
)
pred_reshaped = np.column_stack(
[pred_values.flatten()]
+ [np.zeros_like(pred_values.flatten())] * (n_features - 1)
)
true_original = test_dataset.scaler.inverse_transform(true_reshaped)[:, 0]
pred_original = test_dataset.scaler.inverse_transform(pred_reshaped)[:, 0]
# Build timestamp index, since date got cutted out in informerdataset we need original dataset and use the index of the beginning of testdata to get the date
csv_path = os.path.join(DATA_PATH)
df = pd.read_csv(csv_path, parse_dates=["date"])
train_len = len(train_dataset)
test_start_idx = train_len + SEQ_LEN
start_timestamp = df["date"].iloc[test_start_idx]
logging.info(f"[DEBUG] timestamp: {start_timestamp}")
timestamps = [
start_timestamp + pd.Timedelta(hours=i) for i in range(len(true_original))
]
df = pd.DataFrame(
{
"Timestamp": timestamps,
"True Consumption (MW)": true_original,
"Predicted Consumption (MW)": pred_original,
}
)
# Save results to CSV
os.makedirs(RESULTS_DIR, exist_ok=True)
results_path = os.path.join(RESULTS_DIR, "test_results.csv")
df.to_csv(results_path, index=False)
logging.info(f"Saved prediction results to: {results_path}")
# Evaluation metrics
mse = mean_squared_error(
df["True Consumption (MW)"], df["Predicted Consumption (MW)"]
)
rmse = np.sqrt(mse)
mape = (
np.mean(
np.abs(
(df["True Consumption (MW)"] - df["Predicted Consumption (MW)"])
/ df["True Consumption (MW)"]
)
)
* 100
)
r2 = r2_score(df["True Consumption (MW)"], df["Predicted Consumption (MW)"])
# Save metrics to JSON
metrics = {"RMSE": float(rmse), "MAPE": float(mape), "R2": float(r2)}
metrics_path = os.path.join(RESULTS_DIR, "evaluation_metrics.json")
with open(metrics_path, "w") as f:
json.dump(metrics, f)
logging.info(f"Saved evaluation metrics to: {metrics_path}")
logging.info(f"RMSE: {rmse:.3f} | MAPE: {mape:.2f}% | R²: {r2:.3f}")
if __name__ == "__main__":
evaluate()
|