Spaces:
Sleeping
Sleeping
import numpy as np | |
from Backend.Benchmarks import Himmelblau, Adjiman, Brent, Ackley | |
from Backend.optimizers import adam, SGD, Azure, RMSprop | |
from Backend.ML_Tasks import MNISTRunner, CIFAR10Runner | |
from Metrics import calculate_benchmark_metrics, calculate_ml_metrics | |
from Plots import plot_benchmark_surface, plot_ml_curves | |
class Engine: | |
def __init__(self): | |
self.benchmarks = { | |
"Himmelblau": Himmelblau, | |
"Adjiman": Adjiman, | |
"Brent": Brent, | |
"Ackley": Ackley, | |
} | |
self.optimizers = { | |
"Adam": adam, | |
"SGD": SGD, | |
"AzureSky": Azure, | |
"RMSprop": RMSprop, | |
} | |
self.ml_tasks = {"MNIST": MNISTRunner, "CIFAR-10": CIFAR10Runner} | |
def run(self, config): | |
"""Run a study based on the provided configuration.""" | |
if config["mode"] == "benchmark": | |
return self.run_benchmark_study(config) | |
elif config["mode"] == "ml_task": | |
return self.run_ml_task_study(config) | |
else: | |
raise ValueError(f"Invalid mode: {config['mode']}") | |
def run_benchmark_study(self, config): | |
"""Run a benchmark study comparing multiple optimizers.""" | |
benchmark_class = self.benchmarks.get(config["benchmark_func"]) | |
if not benchmark_class: | |
raise ValueError(f"Unknown benchmark: {config['benchmark_func']}") | |
benchmark = benchmark_class() | |
optimizers = [] | |
for opt_name in config["optimizers"]: | |
opt_class = self.optimizers.get(opt_name) | |
if not opt_class: | |
raise ValueError(f"Unknown optimizer: {opt_name}") | |
# Pass use_sa for AzureSky if specified | |
kwargs = ( | |
{"use_sa": config["use_sa"]} | |
if opt_name == "AzureSky" and "use_sa" in config | |
else {} | |
) | |
optimizers.append(opt_class(**kwargs)) | |
initial_point = np.random.randn(config.get("dim", 2)) | |
max_iter = config.get("max_iter", 100) | |
paths = [] | |
loss_values = [] | |
for opt in optimizers: | |
path = [] | |
losses = [] | |
x = initial_point.copy() | |
opt.reset() # Reset optimizer state | |
for _ in range(max_iter): | |
grad = benchmark.grad_f(x) | |
x = opt.step(x, grad) | |
path.append(x.copy()) | |
losses.append(benchmark.f(x)) | |
paths.append(np.array(path)) | |
loss_values.append(losses) | |
metrics = [ | |
calculate_benchmark_metrics(path[-1], benchmark.global_min, path, losses) | |
for path, losses in zip(paths, loss_values) | |
] | |
plot = plot_benchmark_surface(benchmark, paths, config["optimizers"]) | |
return {"plot": plot, "metrics": metrics, "paths": paths} | |
def run_ml_task_study(self, config): | |
"""Run an ML task study comparing multiple optimizers.""" | |
task_class = self.ml_tasks.get(config["dataset"]) | |
if not task_class: | |
raise ValueError(f"Unknown dataset: {config['dataset']}") | |
task_runner = task_class() | |
optimizers = [] | |
for opt_name in config["optimizers"]: | |
opt_class = self.optimizers.get(opt_name) | |
if not opt_class: | |
raise ValueError(f"Unknown optimizer: {opt_name}") | |
kwargs = ( | |
{"use_sa": config["use_sa"]} | |
if opt_name == "AzureSky" and "use_sa" in config | |
else {} | |
) | |
optimizers.append(opt_class(**kwargs)) | |
histories = [] | |
for opt in optimizers: | |
history = task_runner.run( | |
optimizer=opt, | |
epochs=config.get("epochs", 10), | |
batch_size=config.get("batch_size", 32), | |
lr=config.get("lr", 0.001), | |
) | |
histories.append(history) | |
metrics = [calculate_ml_metrics(h["train"], h["val"]) for h in histories] | |
plot_acc = plot_ml_curves( | |
[h["train"]["accuracy"] for h in histories], | |
[h["val"]["accuracy"] for h in histories], | |
config["optimizers"], | |
"Accuracy", | |
) | |
plot_loss = plot_ml_curves( | |
[h["train"]["loss"] for h in histories], | |
[h["val"]["loss"] for h in histories], | |
config["optimizers"], | |
"Loss", | |
) | |
return { | |
"plot_acc": plot_acc, | |
"plot_loss": plot_loss, | |
"metrics": metrics, | |
"histories": histories, | |
} | |
def list_benchmarks(self): | |
"""Return available benchmark functions.""" | |
return list(self.benchmarks.keys()) | |
def list_optimizers(self): | |
"""Return available optimizers.""" | |
return list(self.optimizers.keys()) | |
def list_ml_tasks(self): | |
"""Return available ML tasks.""" | |
return list(self.ml_tasks.keys()) | |
if __name__ == "__main__": | |
engine = Engine() | |
# Example benchmark study | |
config = { | |
"mode": "benchmark", | |
"benchmark_func": "Himmelblau", | |
"optimizers": ["Adam", "AzureSky"], | |
"dim": 2, | |
"max_iter": 100, | |
"use_sa": True, | |
} | |
results = engine.run(config) | |
print("Benchmark Results:", results["metrics"]) | |
# Example ML task study | |
config = { | |
"mode": "ml_task", | |
"dataset": "MNIST", | |
"optimizers": ["Adam", "AzureSky"], | |
"epochs": 5, | |
"batch_size": 32, | |
"lr": 0.001, | |
"use_sa": True, | |
} | |
results = engine.run(config) | |
print("ML Task Results:", results['metrics']) | |