|
import gradio as gr |
|
import numpy as np |
|
import logging |
|
from tqdm import tqdm |
|
|
|
class SwarmAgent: |
|
def __init__(self, position, velocity): |
|
self.position = position |
|
self.velocity = velocity |
|
self.m = np.zeros_like(position) |
|
self.v = np.zeros_like(position) |
|
|
|
class SwarmNeuralNetwork: |
|
def __init__(self, num_agents, param_shape, target_response, optimization_function): |
|
self.param_shape = param_shape |
|
self.agents = [SwarmAgent(self.random_position(), self.random_velocity()) for _ in range(num_agents)] |
|
self.target_response = target_response |
|
self.optimization_function = optimization_function |
|
self.current_epoch = 0 |
|
self.noise_schedule = np.linspace(0.1, 0.002, 1000) |
|
|
|
def random_position(self): |
|
return np.random.randn(*self.param_shape) |
|
|
|
def random_velocity(self): |
|
return np.random.randn(*self.param_shape) * 0.01 |
|
|
|
def evaluate_function(self, params): |
|
|
|
response = self.optimization_function(params) |
|
return response |
|
|
|
def update_agents(self, timestep): |
|
noise_level = self.noise_schedule[min(timestep, len(self.noise_schedule) - 1)] |
|
|
|
for agent in self.agents: |
|
|
|
predicted_noise = agent.position - self.target_response |
|
|
|
|
|
denoised = (agent.position - noise_level * predicted_noise) / (1 - noise_level) |
|
|
|
|
|
agent.position = denoised + np.random.randn(*self.param_shape) * np.sqrt(noise_level) |
|
|
|
|
|
agent.position = np.clip(agent.position, -1, 1) |
|
|
|
def train(self, epochs): |
|
logging.basicConfig(filename='training.log', level=logging.INFO) |
|
|
|
for epoch in tqdm(range(epochs), desc="Training Epochs"): |
|
self.update_agents(epoch) |
|
|
|
|
|
responses = [self.evaluate_function(agent.position) for agent in self.agents] |
|
mse = np.mean((np.array(responses) - self.target_response)**2) |
|
logging.info(f"Epoch {epoch}, MSE: {mse}") |
|
|
|
if epoch % 10 == 0: |
|
print(f"Epoch {epoch}, MSE: {mse}") |
|
self.current_epoch += 1 |
|
|
|
def save_model(self, filename): |
|
model_state = { |
|
'agents': self.agents, |
|
'current_epoch': self.current_epoch |
|
} |
|
np.save(filename, model_state) |
|
|
|
def load_model(self, filename): |
|
model_state = np.load(filename, allow_pickle=True).item() |
|
self.agents = model_state['agents'] |
|
self.current_epoch = model_state['current_epoch'] |
|
|
|
def generate_new_parameters(self, num_steps=1000): |
|
for agent in self.agents: |
|
agent.position = np.random.randn(*self.param_shape) |
|
|
|
for step in tqdm(range(num_steps), desc="Generating Parameters"): |
|
self.update_agents(num_steps - step - 1) |
|
|
|
best_agent = min(self.agents, key=lambda agent: np.mean((self.evaluate_function(agent.position) - self.target_response)**2)) |
|
return best_agent.position |
|
|
|
|
|
def optimization_function(params): |
|
return np.sum((params - 3)**2) |
|
|
|
|
|
def train_snn(target_response, num_agents, epochs): |
|
param_shape = (10,) |
|
snn = SwarmNeuralNetwork(num_agents=num_agents, param_shape=param_shape, target_response=target_response, optimization_function=optimization_function) |
|
snn.train(epochs=epochs) |
|
snn.save_model('snn_model.npy') |
|
return snn.generate_new_parameters() |
|
|
|
def generate_new_parameters(): |
|
param_shape = (10,) |
|
snn = SwarmNeuralNetwork(num_agents=2000, param_shape=param_shape, target_response=None, optimization_function=optimization_function) |
|
snn.load_model('snn_model.npy') |
|
new_params = snn.generate_new_parameters() |
|
return new_params |
|
|
|
interface = gr.Interface( |
|
fn=train_snn, |
|
inputs=[ |
|
gr.Number(label="Target Response"), |
|
gr.Slider(minimum=500, maximum=3000, value=2000, label="Number of Agents"), |
|
gr.Slider(minimum=10, maximum=200, value=100, label="Number of Epochs") |
|
], |
|
outputs=gr.Textbox(label="Generated Parameters"), |
|
title="Swarm Neural Network Function Optimization", |
|
description="Set the target response, number of agents, and epochs to train the Swarm Neural Network to generate optimized function parameters." |
|
) |
|
|
|
interface.launch() |
|
|