File size: 15,982 Bytes
d03075d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
import random
import numpy as np
import threading
import panel as pn
pn.extension(template='bootstrap')
import holoviews as hv
import time
import pandas as pd
from holoviews.streams import Stream
hv.extension('bokeh', logo=False)
# Particle class: Each particle will be an object of this class with all the properties defined in __init__() method
class Particle():
# Method to initialize particle properties
def __init__(self, initial):
self.position = []
self.velocity = []
self.initial = initial
self.best_position = []
self.best_error = float('inf') # Initialize best_error with infinity
self.error = float('inf') # Initialize error with infinity
self.num_dimensions = 2
for i in range(0, self.num_dimensions):
self.velocity.append(random.uniform(-1, 1))
self.position.append(initial[i])
# Method to update velocity of a particle object
def update_velocity(self, global_best_position, max_iter, iter_count):
c1_start = 2.5
c1_end = 0.5
c2_start = 0.5
c2_end = 2.5
w = 0.7298
c1 = c1_start - (c1_start - c1_end) * (iter_count / max_iter)
c2 = c2_start + (c2_end - c2_start) * (iter_count / max_iter)
for i in range(0, self.num_dimensions):
r1 = random.random()
r2 = random.random()
cog_vel = c1 * r1 * (self.best_position[i] - self.position[i])
social_vel = c2 * r2 * (global_best_position[i] - self.position[i])
self.velocity[i] = w * self.velocity[i] + cog_vel + social_vel
# Method to update position of a particle object
def update_position(self, bounds):
for i in range(0, self.num_dimensions):
self.position[i] = self.position[i] + self.velocity[i]
if self.position[i] > bounds[i][1]:
self.position[i] = bounds[i][1]
if self.position[i] < bounds[i][0]:
self.position[i] = bounds[i][0]
# Method to evaluate fitness of a particle
def evaluate_fitness(self, number, target, function):
if number == 1:
self.error = fitness_function(self.position, target)
else:
self.error = cost_function(self.position, function)
if self.error < self.best_error:
self.best_position = self.position[:] # Create a copy of the position list
self.best_error = self.error
# Getter method to return the present error of a particle
def get_error(self):
return self.error
# Getter method to return the best position of a particle
def get_best_pos(self):
return self.best_position[:] # Return a copy of the best position list
# Getter method to return the best error of a particle
def get_best_error(self):
return self.best_error
# Getter method to return the best position of a particle
def get_pos(self):
return self.position[:] # Return a copy of the position list
# Getter method to return the velocity of a particle
def get_velocity(self):
return self.velocity[:] # Return a copy of the velocity list
# Function to calculate the euclidean distance from a particle to target
def fitness_function(particle_position, target):
x_pos, y_pos = float(target[0]), float(target[1])
return (x_pos - particle_position[0])**2 + (y_pos - particle_position[1])**2
# Function to calculate the value of the mathematical function at the position of a particle
import sympy as sp
def cost_function(particle_position, function_str):
x, y = sp.symbols('x y')
function = sp.sympify(function_str)
return function.subs({x: particle_position[0], y: particle_position[1]})
# Interactive Class: to create a swarm of particles and an interactive PSO
class Interactive_PSO():
# Method to initialize properties of an Interactive PSO
def __init__(self):
self._running = False
self.max_iter = 500 # Set the desired maximum number of iterations
self.num_particles = 25
self.initial = [5, 5]
self.bounds = [(-500, 500), (-500, 500)]
self.x_axis = []
self.y_axis = []
self.target = [5] * 2
self.global_best_error = float('inf') # Initialize global_best_error with infinity
self.update_particles_position_lists_with_random_values()
self.global_best_position = [0, 0]
# Method to initialize swarm to find the target in a given search space
# Method to initialize swarm to find the target in a given search space
def swarm_initialization(self, number, max_iter):
swarm = []
self.global_best_position = [0, 0]
self.global_best_error = float('inf') # Initialize global_best_error with infinity
self.gamma = 0.0001
function = function_select.value
for i in range(0, self.num_particles): # For loop to initialize the swarm of particles
swarm.append(Particle([self.x_axis[i], self.y_axis[i]]))
iter_count = 0
while self._running: # Loop to identify the best solution depending upon the problem
if self.global_best_error <= 0.00001:
break
for j in range(0, self.num_particles):
swarm[j].evaluate_fitness(number, self.target, function)
if swarm[j].get_error() < self.global_best_error:
self.global_best_position = swarm[j].get_best_pos()
self.global_best_error = swarm[j].get_best_error()
for j in range(0, self.num_particles):
swarm[j].update_velocity(self.global_best_position, max_iter, iter_count)
swarm[j].update_position(self.bounds)
self.x_axis[j] = swarm[j].get_pos()[0]
self.y_axis[j] = swarm[j].get_pos()[1]
# Add a delay to see the particle movement
time.sleep(0.05) # Adjust the delay as needed
iter_count += 1
# Update the table with the current global best position
update_table = True # <-- Set update_table to True
hv.streams.Stream.trigger(table_dmap.streams)
self.initial = self.global_best_position
self._running = False
print('Best Position:', self.global_best_position)
print('Best Error:', self.global_best_error)
print('Function:', function)
# Method to terminate finding the solution of a problem
def terminate(self):
self._running = False
# Method to set _running parameter before initializing the swarm
def starting(self):
self._running = True
# Method to check if the swarm of particles are in action
def isrunning(self):
return self._running
# Getter method to return the number of particles
def get_num_particles(self):
return self.num_particles
# Setter method to update the number of particles
def update_num_particles(self, new_value):
self.num_particles = new_value
# Getter method to return the x_axis position list for particles in a swarm
def get_xaxis(self):
return self.x_axis[:] # Return a copy
# Getter method to return the y_axis position list for particles in a swarm
def get_yaxis(self):
return self.y_axis[:] # Return a copy
# Setter method to update the target position
def set_target(self, x, y):
self.target = [x, y]
# Getter method to return the target position
def get_target(self):
return self.target[:] # Return a copy
# Method to update the length of particles position lists if there is a change in num of particles
def update_particles_position_lists(self, updated_num_particles):
old_x_value = self.x_axis[0]
old_y_value = self.y_axis[0]
if updated_num_particles > self.num_particles:
for i in range(self.num_particles, updated_num_particles):
self.x_axis.append(old_x_value)
self.y_axis.append(old_y_value)
else:
for i in range((self.num_particles) - 1, updated_num_particles - 1, -1):
self.x_axis.pop(i)
self.y_axis.pop(i)
# Method to initialize the particles positions randomly
def update_particles_position_lists_with_random_values(self):
self.x_axis = random.sample(range(-500, 500), self.num_particles)
self.y_axis = random.sample(range(-500, 500), self.num_particles)
pso_swarm = Interactive_PSO() # Creating an interactive PSO to find the target
pso_computation_swarm = Interactive_PSO() # Creating an interactive PSO to find the optimal solution of a mathematical function
update_table = False
# Method to initialize swarm to find the target in a given search space
def start_finding_the_target():
pso_swarm.swarm_initialization(1, pso_swarm.max_iter)
# Method to initialize swarm to compute an optimal solution for a given problem
def start_computation():
pso_computation_swarm.swarm_initialization(2, pso_computation_swarm.max_iter)
# On event function for single tap to create and return the target with updated position
def create_target_element(x, y):
pso_swarm.terminate()
if x is not None:
pso_swarm.set_target(x, y)
return hv.Points((x, y, 1), label='Target').opts(color='red', marker='^', size=10)
# Function to stream the particles of pso_swarm to dynamic map in regular intervals
def update():
x_axis = pso_swarm.get_xaxis()
y_axis = pso_swarm.get_yaxis()
data = (x_axis, y_axis, np.random.random(size=len(x_axis)))
pop_scatter = hv.Scatter(data, vdims=['y_axis', 'z'])
pop_scatter.opts(size=8, color='z', cmap='Coolwarm_r')
return pop_scatter
# On event function for update button click to update the number of particles in both the swarms
def computational_update():
x_axis = pso_computation_swarm.get_xaxis()
y_axis = pso_computation_swarm.get_yaxis()
data = (x_axis, y_axis, np.random.random(size=len(x_axis)))
pop_scatter1 = hv.Scatter(data, vdims=['y_axis', 'z'])
pop_scatter1.opts(size=8, color='z', cmap='Coolwarm_r')
return pop_scatter1
# On event function for update button click to update the number of particles in both the swarms
def update_num_particles_event(event):
if population_slider.value == pso_swarm.get_num_particles():
return
pso_swarm.terminate()
pso_computation_swarm.terminate()
time.sleep(1)
updated_num_particles = population_slider.value
pso_swarm.update_particles_position_lists(updated_num_particles)
pso_swarm.update_num_particles(updated_num_particles)
pso_computation_swarm.update_num_particles(updated_num_particles)
pso_computation_swarm.update_particles_position_lists_with_random_values()
pso_swarm.update_particles_position_lists_with_random_values() # Update positions for pso_swarm as well
hv.streams.Stream.trigger(pso_scatter1.streams)
hv.streams.Stream.trigger(pso_scatter.streams)
# Periodic Callback function for every 3 seconds to stream the data to dynamic maps
def trigger_streams():
global update_table
hv.streams.Stream.trigger(pso_scatter.streams)
hv.streams.Stream.trigger(pso_scatter1.streams)
if update_table:
update_table = False
hv.streams.Stream.trigger(table_dmap.streams)
# Update the target position
tap.event(x=pso_swarm.get_target()[0], y=pso_swarm.get_target()[1])
# Slow down the swarm's speed
time.sleep(0.05) # Adjust the delay as needed
# On event function for begin the hunting button click to start hunting for the target
def hunting_button_event(event):
if not pso_swarm.isrunning():
pso_swarm.starting()
threading.Thread(target=start_finding_the_target).start()
# On event function for start the computation button click to start computation for a mathematical function
def computation_button_event(event):
if not pso_computation_swarm.isrunning():
pso_computation_swarm.starting()
threading.Thread(target=start_computation).start()
def table():
position = pso_computation_swarm.global_best_position
df = pd.DataFrame({
'x_position': [round(position[0])],
'y_position': [round(position[1])]
})
# Create an hv.Table with the data
hv_table = hv.Table(df).opts(width=300, height=100)
return hv_table
# Function to update the mathematical function for which swarm finds the optimal solution
def update_function(event):
pso_computation_swarm.terminate()
time.sleep(1)
pso_computation_swarm.update_particles_position_lists_with_random_values()
# Two dynamic maps for two interactive PSOs, one for finding a target and one for computation of a mathematical function
pso_scatter = hv.DynamicMap(update, streams=[Stream.define('Next')()]).opts(xlim=(-500, 500), ylim=(-500, 500),
title="Plot 2 : PSO for target finding ")
pso_scatter1 = hv.DynamicMap(computational_update, streams=[Stream.define('Next')()]).opts(xlim=(-500, 500),
ylim=(-500, 500),
title="Plot 1 : PSO for a mathematical computation")
# Dynamic map to update and display target
tap = hv.streams.SingleTap(x=pso_swarm.get_target()[0], y=pso_swarm.get_target()[1])
target_dmap = hv.DynamicMap(create_target_element, streams=[tap])
# Define custom CSS styles for the table container
custom_style = {
'background': '##4287f5', # Background color
'border': '1px solid black', # Border around the table
'padding': '8px', # Padding inside the container
'box-shadow': '5px 5px 5px #bcbcbc' # Box shadow for a 3D effect
}
# Dynamic map to update the table with continuous global best position of the swarm
table_dmap = hv.DynamicMap(table,streams=[hv.streams.Stream.define('Next')()])
table_label = pn.pane.Markdown("Once an optimal solution is found in plot 1 it is updated in the below table")
# Button to order the swarm of particles to start finding the target
start_hunting_button = pn.widgets.Button(name=' Click to find target for plot 2 ', width=50)
start_hunting_button.on_click(hunting_button_event)
# Button to order the swarm of particles to start computation for selected mathematical function
start_finding_button = pn.widgets.Button(name=' Click to start computation for plot 1', width=50)
start_finding_button.on_click(computation_button_event)
# Button to update number of particles
update_num_particles_button = pn.widgets.Button(name='Update number of particles', width=50)
update_num_particles_button.on_click(update_num_particles_event)
# periodic callback for every three seconds to trigger streams method
pn.state.add_periodic_callback(trigger_streams, 3)
# Slider to change the number of particles
population_slider = pn.widgets.IntSlider(name='Number of praticles', start=10, end=100, value=25)
# Dropdown list to select a mathematical function
function_select = pn.widgets.Select(name='Select', options=['x^2+(y-100)^2','(x-234)^2+(y+100)^2', 'x^3 + y^3 - 3*x*y', 'x^2 * y^2'])
function_select.param.watch(update_function,'value')
#combining the dynamic maps with particles and target into one dynamicmap
plot_for_finding_the_target = pso_scatter*target_dmap
# Building the layout and returning the dashboard
dashboard = pn.Column(pn.Row(pn.Row(pso_scatter1.opts(width=500, height=500)), pn.Column(plot_for_finding_the_target.opts(width=500, height=500)),
pn.Column(pn.Column(table_label, table_dmap, styles=custom_style), start_finding_button, start_hunting_button, update_num_particles_button, population_slider,function_select)))
pn.panel(dashboard).servable(title='Swarm Particles Visualization') |