import gradio as gr import numpy as np import matplotlib.pyplot as plt def relu(x): return np.maximum(0, x) def relu_derivative(x): return (x > 0).astype(float) def tanh(x): return np.tanh(x) def tanh_derivative(x): return 1 - np.tanh(x)**2 class EveOptimizer: def __init__(self, params, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8): self.params = params self.lr = learning_rate self.beta1 = beta1 self.beta2 = beta2 self.epsilon = epsilon self.t = 0 self.m = [np.zeros_like(p) for p in params] self.v = [np.zeros_like(p) for p in params] self.fractal_memory = [np.zeros_like(p) for p in params] def step(self, grads): self.t += 1 for i, (param, grad) in enumerate(zip(self.params, grads)): self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (grad ** 2) m_hat = self.m[i] / (1 - self.beta1 ** self.t) v_hat = self.v[i] / (1 - self.beta2 ** self.t) fractal_factor = self.fractal_adjustment(param, grad) self.fractal_memory[i] = 0.9 * self.fractal_memory[i] + 0.1 * fractal_factor param -= self.lr * m_hat / (np.sqrt(v_hat) + self.epsilon) * self.fractal_memory[i] def fractal_adjustment(self, param, grad): c = np.mean(grad) + 1j * np.std(param) z = 0 for _ in range(10): z = z**2 + c if abs(z) > 2: break return 1 / (1 + abs(z)) class BatchNormalization: def __init__(self, input_shape): self.gamma = np.ones(input_shape) self.beta = np.zeros(input_shape) self.epsilon = 1e-5 self.moving_mean = np.zeros(input_shape) self.moving_var = np.ones(input_shape) def forward(self, x, training=True): if training: mean = np.mean(x, axis=0) var = np.var(x, axis=0) self.moving_mean = 0.99 * self.moving_mean + 0.01 * mean self.moving_var = 0.99 * self.moving_var + 0.01 * var else: mean = self.moving_mean var = self.moving_var x_norm = (x - mean) / np.sqrt(var + self.epsilon) out = self.gamma * x_norm + self.beta if training: self.cache = (x, x_norm, mean, var) return out def backward(self, dout): x, x_norm, mean, var = self.cache m = x.shape[0] dx_norm = dout * self.gamma dvar = np.sum(dx_norm * (x - mean) * -0.5 * (var + self.epsilon)**(-1.5), axis=0) dmean = np.sum(dx_norm * -1 / np.sqrt(var + self.epsilon), axis=0) + dvar * np.mean(-2 * (x - mean), axis=0) dx = dx_norm / np.sqrt(var + self.epsilon) + dvar * 2 * (x - mean) / m + dmean / m dgamma = np.sum(dout * x_norm, axis=0) dbeta = np.sum(dout, axis=0) return dx, dgamma, dbeta class Reward: def __init__(self): self.lowest_avg_batch_loss = float('inf') self.lowest_max_batch_loss = float('inf') self.best_weights = None def update(self, avg_batch_loss, max_batch_loss, network): improved = False if avg_batch_loss < self.lowest_avg_batch_loss: self.lowest_avg_batch_loss = avg_batch_loss improved = True if max_batch_loss < self.lowest_max_batch_loss: self.lowest_max_batch_loss = max_batch_loss improved = True if improved: self.best_weights = self.get_network_weights(network) def get_network_weights(self, network): weights = [] for layer in network.layers: layer_weights = [] for agent in layer.agents: agent_weights = { 'weights': agent.weights.copy(), 'bias': agent.bias.copy(), 'bn_gamma': agent.bn.gamma.copy(), 'bn_beta': agent.bn.beta.copy() } layer_weights.append(agent_weights) weights.append(layer_weights) return weights def apply_best_weights(self, network): if self.best_weights is not None: for layer, layer_weights in zip(network.layers, self.best_weights): for agent, agent_weights in zip(layer.agents, layer_weights): agent.weights = agent_weights['weights'].copy() agent.bias = agent_weights['bias'].copy() agent.bn.gamma = agent_weights['bn_gamma'].copy() agent.bn.beta = agent_weights['bn_beta'].copy() class Agent: def __init__(self, id, input_size, output_size, fractal_method): self.id = id self.weights = np.random.randn(input_size, output_size) * np.sqrt(2. / input_size) self.bias = np.zeros((1, output_size)) self.fractal_method = fractal_method self.bn = BatchNormalization((output_size,)) self.optimizer = EveOptimizer([self.weights, self.bias, self.bn.gamma, self.bn.beta]) def forward(self, x, training=True): self.last_input = x z = np.dot(x, self.weights) + self.bias z_bn = self.bn.forward(z, training) self.last_output = relu(z_bn) return self.last_output def backward(self, error, l2_lambda=1e-5): delta = error * relu_derivative(self.last_output) delta, dgamma, dbeta = self.bn.backward(delta) dw = np.dot(self.last_input.T, delta) + l2_lambda * self.weights db = np.sum(delta, axis=0, keepdims=True) self.optimizer.step([dw, db, dgamma, dbeta]) return np.dot(delta, self.weights.T) def apply_fractal(self, x): return self.fractal_method(x) class Swarm: def __init__(self, num_agents, input_size, output_size, fractal_method): self.agents = [Agent(i, input_size, output_size, fractal_method) for i in range(num_agents)] def forward(self, x, training=True): results = [agent.forward(x, training) for agent in self.agents] return np.mean(results, axis=0) def backward(self, error, l2_lambda): errors = [agent.backward(error, l2_lambda) for agent in self.agents] return np.mean(errors, axis=0) def apply_fractal(self, x): results = [agent.apply_fractal(x) for agent in self.agents] return np.mean(results, axis=0) class SwarmNeuralNetwork: def __init__(self, layer_sizes, fractal_methods): self.layers = [] for i in range(len(layer_sizes) - 2): self.layers.append(Swarm(num_agents=3, input_size=layer_sizes[i], output_size=layer_sizes[i+1], fractal_method=fractal_methods[i])) self.output_layer = Swarm(num_agents=1, input_size=layer_sizes[-2], output_size=layer_sizes[-1], fractal_method=fractal_methods[-1]) self.reward = Reward() def forward(self, x, training=True): self.layer_outputs = [x] for layer in self.layers: x = layer.forward(x, training) self.layer_outputs.append(x) self.final_output = tanh(self.output_layer.forward(x, training)) return self.final_output def backward(self, error, l2_lambda=1e-5): error = error * tanh_derivative(self.final_output) error = self.output_layer.backward(error, l2_lambda) for i in reversed(range(len(self.layers))): error = self.layers[i].backward(error, l2_lambda) def train(self, X, y, epochs, batch_size=32, l2_lambda=1e-5, patience=50): best_mse = float('inf') patience_counter = 0 for epoch in range(epochs): indices = np.arange(len(X)) np.random.shuffle(indices) self.reward.apply_best_weights(self) epoch_losses = [] for start_idx in range(0, len(X) - batch_size + 1, batch_size): batch_indices = indices[start_idx:start_idx+batch_size] X_batch = X[batch_indices] y_batch = y[batch_indices] output = self.forward(X_batch) error = y_batch - output error = np.clip(error, -1, 1) self.backward(error, l2_lambda) epoch_losses.append(np.mean(np.square(error))) avg_batch_loss = np.mean(epoch_losses) max_batch_loss = np.max(epoch_losses) self.reward.update(avg_batch_loss, max_batch_loss, self) mse = np.mean(np.square(y - self.forward(X, training=False))) if epoch % 100 == 0: print(f"Epoch {epoch}, MSE: {mse:.6f}, Avg Batch Loss: {avg_batch_loss:.6f}, Min Batch Loss: {np.min(epoch_losses):.6f}, Max Batch Loss: {max_batch_loss:.6f}") if mse < best_mse: best_mse = mse patience_counter = 0 else: patience_counter += 1 if patience_counter >= patience: print(f"Early stopping at epoch {epoch}") break return best_mse def apply_fractals(self, x): fractal_outputs = [] for i, layer in enumerate(self.layers): x = self.layer_outputs[i+1] fractal_output = layer.apply_fractal(x) fractal_outputs.append(fractal_output) return fractal_outputs def sierpinski_fractal(input_data): t = np.linspace(0, 2 * np.pi, input_data.shape[0]) x = np.mean(input_data) * np.cos(t) y = np.mean(input_data) * np.sin(t) return x, y def mandelbrot_fractal(input_data, max_iter=10): output = np.zeros(input_data.shape[0]) for i in range(input_data.shape[0]): c = input_data[i, 0] + 0.1j * np.std(input_data) z = 0 for n in range(max_iter): if abs(z) > 2: output[i] = n break z = z*z + c else: output[i] = max_iter return output def julia_fractal(input_data, max_iter=10): output = np.zeros(input_data.shape[0]) c = -0.8 + 0.156j for i in range(input_data.shape[0]): z = input_data[i, 0] + 0.1j * np.std(input_data) for n in range(max_iter): if abs(z) > 2: output[i] = n break z = z*z + c else: output[i] = max_iter return output def run_snn(epochs, batch_size, l2_lambda, patience): np.random.seed(42) X = np.linspace(0, 10, 1000).reshape(-1, 1) y = np.sin(X).reshape(-1, 1) X = (X - X.min()) / (X.max() - X.min()) y = (y - y.min()) / (y.max() - y.min()) snn = SwarmNeuralNetwork(layer_sizes=[1, 32, 16, 8, 1], fractal_methods=[sierpinski_fractal, mandelbrot_fractal, julia_fractal, julia_fractal]) snn.train(X, y, epochs=epochs, batch_size=batch_size, l2_lambda=l2_lambda, patience=patience) y_pred = snn.forward(X, training=False) fractal_outputs = snn.apply_fractals(X) fig, axs = plt.subplots(2, 2, figsize=(15, 10)) axs[0, 0].plot(X, y, label='True') axs[0, 0].plot(X, y_pred, label='Predicted') axs[0, 0].legend() axs[0, 0].set_title('True vs Predicted') x, y = fractal_outputs[0] axs[0, 1].plot(x, y) axs[0, 1].set_title('Sierpinski Fractal Output') axs[1, 0].plot(X, fractal_outputs[1]) axs[1, 0].set_title('Mandelbrot Fractal Output') axs[1, 1].plot(X, fractal_outputs[2]) axs[1, 1].set_title('Julia Fractal Output') plt.tight_layout() return fig with gr.Blocks() as demo: epochs = gr.Slider(1, 10000, value=5000, label="Epochs") batch_size = gr.Slider(1, 100, value=32, label="Batch Size") l2_lambda = gr.Slider(0.0001, 0.1, value=0.00001, label="L2 Lambda") patience = gr.Slider(1, 1000, value=50, label="Patience") plot = gr.Plot() def update_plot(epochs, batch_size, l2_lambda, patience): return run_snn(epochs, batch_size, l2_lambda, patience) btn = gr.Button("Run SNN") btn.click(update_plot, inputs=[epochs, batch_size, l2_lambda, patience], outputs=plot) demo.launch()