|
import gradio as gr |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
|
|
def relu(x): |
|
return np.maximum(0, x) |
|
|
|
def relu_derivative(x): |
|
return (x > 0).astype(float) |
|
|
|
def tanh(x): |
|
return np.tanh(x) |
|
|
|
def tanh_derivative(x): |
|
return 1 - np.tanh(x)**2 |
|
|
|
class EveOptimizer: |
|
def __init__(self, params, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8): |
|
self.params = params |
|
self.lr = learning_rate |
|
self.beta1 = beta1 |
|
self.beta2 = beta2 |
|
self.epsilon = epsilon |
|
self.t = 0 |
|
self.m = [np.zeros_like(p) for p in params] |
|
self.v = [np.zeros_like(p) for p in params] |
|
self.fractal_memory = [np.zeros_like(p) for p in params] |
|
|
|
def step(self, grads): |
|
self.t += 1 |
|
for i, (param, grad) in enumerate(zip(self.params, grads)): |
|
self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad |
|
self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (grad ** 2) |
|
|
|
m_hat = self.m[i] / (1 - self.beta1 ** self.t) |
|
v_hat = self.v[i] / (1 - self.beta2 ** self.t) |
|
|
|
fractal_factor = self.fractal_adjustment(param, grad) |
|
self.fractal_memory[i] = 0.9 * self.fractal_memory[i] + 0.1 * fractal_factor |
|
|
|
param -= self.lr * m_hat / (np.sqrt(v_hat) + self.epsilon) * self.fractal_memory[i] |
|
|
|
def fractal_adjustment(self, param, grad): |
|
c = np.mean(grad) + 1j * np.std(param) |
|
z = 0 |
|
for _ in range(10): |
|
z = z**2 + c |
|
if abs(z) > 2: |
|
break |
|
return 1 / (1 + abs(z)) |
|
|
|
class BatchNormalization: |
|
def __init__(self, input_shape): |
|
self.gamma = np.ones(input_shape) |
|
self.beta = np.zeros(input_shape) |
|
self.epsilon = 1e-5 |
|
self.moving_mean = np.zeros(input_shape) |
|
self.moving_var = np.ones(input_shape) |
|
|
|
def forward(self, x, training=True): |
|
if training: |
|
mean = np.mean(x, axis=0) |
|
var = np.var(x, axis=0) |
|
self.moving_mean = 0.99 * self.moving_mean + 0.01 * mean |
|
self.moving_var = 0.99 * self.moving_var + 0.01 * var |
|
else: |
|
mean = self.moving_mean |
|
var = self.moving_var |
|
|
|
x_norm = (x - mean) / np.sqrt(var + self.epsilon) |
|
out = self.gamma * x_norm + self.beta |
|
if training: |
|
self.cache = (x, x_norm, mean, var) |
|
return out |
|
|
|
def backward(self, dout): |
|
x, x_norm, mean, var = self.cache |
|
m = x.shape[0] |
|
|
|
dx_norm = dout * self.gamma |
|
dvar = np.sum(dx_norm * (x - mean) * -0.5 * (var + self.epsilon)**(-1.5), axis=0) |
|
dmean = np.sum(dx_norm * -1 / np.sqrt(var + self.epsilon), axis=0) + dvar * np.mean(-2 * (x - mean), axis=0) |
|
|
|
dx = dx_norm / np.sqrt(var + self.epsilon) + dvar * 2 * (x - mean) / m + dmean / m |
|
dgamma = np.sum(dout * x_norm, axis=0) |
|
dbeta = np.sum(dout, axis=0) |
|
|
|
return dx, dgamma, dbeta |
|
|
|
class Reward: |
|
def __init__(self): |
|
self.lowest_avg_batch_loss = float('inf') |
|
self.lowest_max_batch_loss = float('inf') |
|
self.best_weights = None |
|
|
|
def update(self, avg_batch_loss, max_batch_loss, network): |
|
improved = False |
|
if avg_batch_loss < self.lowest_avg_batch_loss: |
|
self.lowest_avg_batch_loss = avg_batch_loss |
|
improved = True |
|
if max_batch_loss < self.lowest_max_batch_loss: |
|
self.lowest_max_batch_loss = max_batch_loss |
|
improved = True |
|
if improved: |
|
self.best_weights = self.get_network_weights(network) |
|
|
|
def get_network_weights(self, network): |
|
weights = [] |
|
for layer in network.layers: |
|
layer_weights = [] |
|
for agent in layer.agents: |
|
agent_weights = { |
|
'weights': agent.weights.copy(), |
|
'bias': agent.bias.copy(), |
|
'bn_gamma': agent.bn.gamma.copy(), |
|
'bn_beta': agent.bn.beta.copy() |
|
} |
|
layer_weights.append(agent_weights) |
|
weights.append(layer_weights) |
|
return weights |
|
|
|
def apply_best_weights(self, network): |
|
if self.best_weights is not None: |
|
for layer, layer_weights in zip(network.layers, self.best_weights): |
|
for agent, agent_weights in zip(layer.agents, layer_weights): |
|
agent.weights = agent_weights['weights'].copy() |
|
agent.bias = agent_weights['bias'].copy() |
|
agent.bn.gamma = agent_weights['bn_gamma'].copy() |
|
agent.bn.beta = agent_weights['bn_beta'].copy() |
|
|
|
class Agent: |
|
def __init__(self, id, input_size, output_size, fractal_method): |
|
self.id = id |
|
self.weights = np.random.randn(input_size, output_size) * np.sqrt(2. / input_size) |
|
self.bias = np.zeros((1, output_size)) |
|
self.fractal_method = fractal_method |
|
self.bn = BatchNormalization((output_size,)) |
|
|
|
self.optimizer = EveOptimizer([self.weights, self.bias, self.bn.gamma, self.bn.beta]) |
|
|
|
def forward(self, x, training=True): |
|
self.last_input = x |
|
z = np.dot(x, self.weights) + self.bias |
|
z_bn = self.bn.forward(z, training) |
|
self.last_output = relu(z_bn) |
|
return self.last_output |
|
|
|
def backward(self, error, l2_lambda=1e-5): |
|
delta = error * relu_derivative(self.last_output) |
|
delta, dgamma, dbeta = self.bn.backward(delta) |
|
|
|
dw = np.dot(self.last_input.T, delta) + l2_lambda * self.weights |
|
db = np.sum(delta, axis=0, keepdims=True) |
|
|
|
self.optimizer.step([dw, db, dgamma, dbeta]) |
|
|
|
return np.dot(delta, self.weights.T) |
|
|
|
def apply_fractal(self, x): |
|
return self.fractal_method(x) |
|
|
|
class Swarm: |
|
def __init__(self, num_agents, input_size, output_size, fractal_method): |
|
self.agents = [Agent(i, input_size, output_size, fractal_method) for i in range(num_agents)] |
|
|
|
def forward(self, x, training=True): |
|
results = [agent.forward(x, training) for agent in self.agents] |
|
return np.mean(results, axis=0) |
|
|
|
def backward(self, error, l2_lambda): |
|
errors = [agent.backward(error, l2_lambda) for agent in self.agents] |
|
return np.mean(errors, axis=0) |
|
|
|
def apply_fractal(self, x): |
|
results = [agent.apply_fractal(x) for agent in self.agents] |
|
return np.mean(results, axis=0) |
|
|
|
class SwarmNeuralNetwork: |
|
def __init__(self, layer_sizes, fractal_methods): |
|
self.layers = [] |
|
for i in range(len(layer_sizes) - 2): |
|
self.layers.append(Swarm(num_agents=3, |
|
input_size=layer_sizes[i], |
|
output_size=layer_sizes[i+1], |
|
fractal_method=fractal_methods[i])) |
|
self.output_layer = Swarm(num_agents=1, |
|
input_size=layer_sizes[-2], |
|
output_size=layer_sizes[-1], |
|
fractal_method=fractal_methods[-1]) |
|
self.reward = Reward() |
|
|
|
def forward(self, x, training=True): |
|
self.layer_outputs = [x] |
|
for layer in self.layers: |
|
x = layer.forward(x, training) |
|
self.layer_outputs.append(x) |
|
self.final_output = tanh(self.output_layer.forward(x, training)) |
|
return self.final_output |
|
|
|
def backward(self, error, l2_lambda=1e-5): |
|
error = error * tanh_derivative(self.final_output) |
|
error = self.output_layer.backward(error, l2_lambda) |
|
for i in reversed(range(len(self.layers))): |
|
error = self.layers[i].backward(error, l2_lambda) |
|
|
|
def train(self, X, y, epochs, batch_size=32, l2_lambda=1e-5, patience=50): |
|
best_mse = float('inf') |
|
patience_counter = 0 |
|
|
|
for epoch in range(epochs): |
|
indices = np.arange(len(X)) |
|
np.random.shuffle(indices) |
|
|
|
self.reward.apply_best_weights(self) |
|
|
|
epoch_losses = [] |
|
for start_idx in range(0, len(X) - batch_size + 1, batch_size): |
|
batch_indices = indices[start_idx:start_idx+batch_size] |
|
X_batch = X[batch_indices] |
|
y_batch = y[batch_indices] |
|
|
|
output = self.forward(X_batch) |
|
error = y_batch - output |
|
|
|
error = np.clip(error, -1, 1) |
|
|
|
self.backward(error, l2_lambda) |
|
|
|
epoch_losses.append(np.mean(np.square(error))) |
|
|
|
avg_batch_loss = np.mean(epoch_losses) |
|
max_batch_loss = np.max(epoch_losses) |
|
self.reward.update(avg_batch_loss, max_batch_loss, self) |
|
|
|
mse = np.mean(np.square(y - self.forward(X, training=False))) |
|
|
|
if epoch % 100 == 0: |
|
print(f"Epoch {epoch}, MSE: {mse:.6f}, Avg Batch Loss: {avg_batch_loss:.6f}, Min Batch Loss: {np.min(epoch_losses):.6f}, Max Batch Loss: {max_batch_loss:.6f}") |
|
|
|
if mse < best_mse: |
|
best_mse = mse |
|
patience_counter = 0 |
|
else: |
|
patience_counter += 1 |
|
|
|
if patience_counter >= patience: |
|
print(f"Early stopping at epoch {epoch}") |
|
break |
|
|
|
return best_mse |
|
|
|
def apply_fractals(self, x): |
|
fractal_outputs = [] |
|
for i, layer in enumerate(self.layers): |
|
x = self.layer_outputs[i+1] |
|
fractal_output = layer.apply_fractal(x) |
|
fractal_outputs.append(fractal_output) |
|
return fractal_outputs |
|
|
|
def sierpinski_fractal(input_data): |
|
t = np.linspace(0, 2 * np.pi, input_data.shape[0]) |
|
x = np.mean(input_data) * np.cos(t) |
|
y = np.mean(input_data) * np.sin(t) |
|
return x, y |
|
|
|
def mandelbrot_fractal(input_data, max_iter=10): |
|
output = np.zeros(input_data.shape[0]) |
|
for i in range(input_data.shape[0]): |
|
c = input_data[i, 0] + 0.1j * np.std(input_data) |
|
z = 0 |
|
for n in range(max_iter): |
|
if abs(z) > 2: |
|
output[i] = n |
|
break |
|
z = z*z + c |
|
else: |
|
output[i] = max_iter |
|
return output |
|
|
|
def julia_fractal(input_data, max_iter=10): |
|
output = np.zeros(input_data.shape[0]) |
|
c = -0.8 + 0.156j |
|
for i in range(input_data.shape[0]): |
|
z = input_data[i, 0] + 0.1j * np.std(input_data) |
|
for n in range(max_iter): |
|
if abs(z) > 2: |
|
output[i] = n |
|
break |
|
z = z*z + c |
|
else: |
|
output[i] = max_iter |
|
return output |
|
|
|
def run_snn(epochs, batch_size, l2_lambda, patience): |
|
np.random.seed(42) |
|
|
|
X = np.linspace(0, 10, 1000).reshape(-1, 1) |
|
y = np.sin(X).reshape(-1, 1) |
|
|
|
X = (X - X.min()) / (X.max() - X.min()) |
|
y = (y - y.min()) / (y.max() - y.min()) |
|
|
|
snn = SwarmNeuralNetwork(layer_sizes=[1, 32, 16, 8, 1], |
|
fractal_methods=[sierpinski_fractal, mandelbrot_fractal, julia_fractal, julia_fractal]) |
|
|
|
snn.train(X, y, epochs=epochs, batch_size=batch_size, l2_lambda=l2_lambda, patience=patience) |
|
|
|
y_pred = snn.forward(X, training=False) |
|
fractal_outputs = snn.apply_fractals(X) |
|
|
|
fig, axs = plt.subplots(2, 2, figsize=(15, 10)) |
|
|
|
axs[0, 0].plot(X, y, label='True') |
|
axs[0, 0].plot(X, y_pred, label='Predicted') |
|
axs[0, 0].legend() |
|
axs[0, 0].set_title('True vs Predicted') |
|
|
|
x, y = fractal_outputs[0] |
|
axs[0, 1].plot(x, y) |
|
axs[0, 1].set_title('Sierpinski Fractal Output') |
|
|
|
axs[1, 0].plot(X, fractal_outputs[1]) |
|
axs[1, 0].set_title('Mandelbrot Fractal Output') |
|
|
|
axs[1, 1].plot(X, fractal_outputs[2]) |
|
axs[1, 1].set_title('Julia Fractal Output') |
|
|
|
plt.tight_layout() |
|
return fig |
|
|
|
with gr.Blocks() as demo: |
|
epochs = gr.Slider(1, 10000, value=5000, label="Epochs") |
|
batch_size = gr.Slider(1, 100, value=32, label="Batch Size") |
|
l2_lambda = gr.Slider(0.0001, 0.1, value=0.00001, label="L2 Lambda") |
|
patience = gr.Slider(1, 1000, value=50, label="Patience") |
|
|
|
plot = gr.Plot() |
|
|
|
def update_plot(epochs, batch_size, l2_lambda, patience): |
|
return run_snn(epochs, batch_size, l2_lambda, patience) |
|
|
|
btn = gr.Button("Run SNN") |
|
btn.click(update_plot, inputs=[epochs, batch_size, l2_lambda, patience], outputs=plot) |
|
|
|
demo.launch() |
|
|