File size: 15,982 Bytes
d03075d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
import random
import numpy as np
import threading
import panel as pn
pn.extension(template='bootstrap')
import holoviews as hv
import time
import pandas as pd
from holoviews.streams import Stream
hv.extension('bokeh', logo=False)

# Particle class: Each particle will be an object of this class with all the properties defined in __init__() method
class Particle():

    # Method to initialize particle properties
    def __init__(self, initial):
        self.position = []
        self.velocity = []
        self.initial = initial
        self.best_position = []
        self.best_error = float('inf')  # Initialize best_error with infinity
        self.error = float('inf')  # Initialize error with infinity
        self.num_dimensions = 2
        for i in range(0, self.num_dimensions):
            self.velocity.append(random.uniform(-1, 1))
            self.position.append(initial[i])

    # Method to update velocity of a particle object
    def update_velocity(self, global_best_position, max_iter, iter_count):
        c1_start = 2.5
        c1_end = 0.5
        c2_start = 0.5
        c2_end = 2.5
        w = 0.7298

        c1 = c1_start - (c1_start - c1_end) * (iter_count / max_iter)
        c2 = c2_start + (c2_end - c2_start) * (iter_count / max_iter)

        for i in range(0, self.num_dimensions):
            r1 = random.random()
            r2 = random.random()

            cog_vel = c1 * r1 * (self.best_position[i] - self.position[i])
            social_vel = c2 * r2 * (global_best_position[i] - self.position[i])
            self.velocity[i] = w * self.velocity[i] + cog_vel + social_vel

    # Method to update position of a particle object
    def update_position(self, bounds):
        for i in range(0, self.num_dimensions):
            self.position[i] = self.position[i] + self.velocity[i]

            if self.position[i] > bounds[i][1]:
                self.position[i] = bounds[i][1]

            if self.position[i] < bounds[i][0]:
                self.position[i] = bounds[i][0]

    # Method to evaluate fitness of a particle
    def evaluate_fitness(self, number, target, function):
        if number == 1:
            self.error = fitness_function(self.position, target)
        else:
            self.error = cost_function(self.position, function)

        if self.error < self.best_error:
            self.best_position = self.position[:]  # Create a copy of the position list
            self.best_error = self.error

    # Getter method to return the present error of a particle
    def get_error(self):
        return self.error

    # Getter method to return the best position of a particle
    def get_best_pos(self):
        return self.best_position[:]  # Return a copy of the best position list

    # Getter method to return the best error of a particle
    def get_best_error(self):
        return self.best_error

    # Getter method to return the best position of a particle
    def get_pos(self):
        return self.position[:]  # Return a copy of the position list

    # Getter method to return the velocity of a particle
    def get_velocity(self):
        return self.velocity[:]  # Return a copy of the velocity list

# Function to calculate the euclidean distance from a particle to target
def fitness_function(particle_position, target):
    x_pos, y_pos = float(target[0]), float(target[1])
    return (x_pos - particle_position[0])**2 + (y_pos - particle_position[1])**2

# Function to calculate the value of the mathematical function at the position of a particle
import sympy as sp

def cost_function(particle_position, function_str):
    x, y = sp.symbols('x y')
    function = sp.sympify(function_str)
    return function.subs({x: particle_position[0], y: particle_position[1]})

# Interactive Class: to create a swarm of particles and an interactive PSO
class Interactive_PSO():

    # Method to initialize properties of an Interactive PSO
    def __init__(self):
        self._running = False
        self.max_iter = 500  # Set the desired maximum number of iterations
        self.num_particles = 25
        self.initial = [5, 5]
        self.bounds = [(-500, 500), (-500, 500)]
        self.x_axis = []
        self.y_axis = []
        self.target = [5] * 2
        self.global_best_error = float('inf')  # Initialize global_best_error with infinity
        self.update_particles_position_lists_with_random_values()
        self.global_best_position = [0, 0]

    # Method to initialize swarm to find the target in a given search space
    # Method to initialize swarm to find the target in a given search space
    def swarm_initialization(self, number, max_iter):
        swarm = []
        self.global_best_position = [0, 0]
        self.global_best_error = float('inf')  # Initialize global_best_error with infinity
        self.gamma = 0.0001
        function = function_select.value
    
        for i in range(0, self.num_particles):  # For loop to initialize the swarm of particles
            swarm.append(Particle([self.x_axis[i], self.y_axis[i]]))
    
        iter_count = 0
        while self._running:  # Loop to identify the best solution depending upon the problem
            if self.global_best_error <= 0.00001:
                break
    
            for j in range(0, self.num_particles):
                swarm[j].evaluate_fitness(number, self.target, function)
    
                if swarm[j].get_error() < self.global_best_error:
                    self.global_best_position = swarm[j].get_best_pos()
                    self.global_best_error = swarm[j].get_best_error()
    
            for j in range(0, self.num_particles):
                swarm[j].update_velocity(self.global_best_position, max_iter, iter_count)
                swarm[j].update_position(self.bounds)
                self.x_axis[j] = swarm[j].get_pos()[0]
                self.y_axis[j] = swarm[j].get_pos()[1]
    
            # Add a delay to see the particle movement
            time.sleep(0.05)  # Adjust the delay as needed
    
            iter_count += 1
    
            # Update the table with the current global best position
            update_table = True  # <-- Set update_table to True
            hv.streams.Stream.trigger(table_dmap.streams)
    
        self.initial = self.global_best_position
        self._running = False
        print('Best Position:', self.global_best_position)
        print('Best Error:', self.global_best_error)
        print('Function:', function)


    # Method to terminate finding the solution of a problem
    def terminate(self):
        self._running = False

    # Method to set _running parameter before initializing the swarm
    def starting(self):
        self._running = True

    # Method to check if the swarm of particles are in action
    def isrunning(self):
        return self._running

    # Getter method to return the number of particles
    def get_num_particles(self):
        return self.num_particles

    # Setter method to update the number of particles
    def update_num_particles(self, new_value):
        self.num_particles = new_value

    # Getter method to return the x_axis position list for particles in a swarm
    def get_xaxis(self):
        return self.x_axis[:]  # Return a copy

    # Getter method to return the y_axis position list for particles in a swarm
    def get_yaxis(self):
        return self.y_axis[:]  # Return a copy

    # Setter method to update the target position
    def set_target(self, x, y):
        self.target = [x, y]
    
    # Getter method to return the target position
    def get_target(self):
        return self.target[:]  # Return a copy
    
    # Method to update the length of particles position lists if there is a change in num of particles
    def update_particles_position_lists(self, updated_num_particles):
        old_x_value = self.x_axis[0]
        old_y_value = self.y_axis[0]
        if updated_num_particles > self.num_particles:
            for i in range(self.num_particles, updated_num_particles):
                self.x_axis.append(old_x_value)
                self.y_axis.append(old_y_value)
        else:
            for i in range((self.num_particles) - 1, updated_num_particles - 1, -1):
                self.x_axis.pop(i)
                self.y_axis.pop(i)
    
    # Method to initialize the particles positions randomly
    def update_particles_position_lists_with_random_values(self):
        self.x_axis = random.sample(range(-500, 500), self.num_particles)
        self.y_axis = random.sample(range(-500, 500), self.num_particles)

pso_swarm = Interactive_PSO()  # Creating an interactive PSO to find the target
pso_computation_swarm = Interactive_PSO()  # Creating an interactive PSO to find the optimal solution of a mathematical function

update_table = False

# Method to initialize swarm to find the target in a given search space
def start_finding_the_target():
    pso_swarm.swarm_initialization(1, pso_swarm.max_iter)

# Method to initialize swarm to compute an optimal solution for a given problem
def start_computation():
    pso_computation_swarm.swarm_initialization(2, pso_computation_swarm.max_iter)

# On event function for single tap to create and return the target with updated position
def create_target_element(x, y):
    pso_swarm.terminate()
    if x is not None:
        pso_swarm.set_target(x, y)
    return hv.Points((x, y, 1), label='Target').opts(color='red', marker='^', size=10)

# Function to stream the particles of pso_swarm to dynamic map in regular intervals
def update():
    x_axis = pso_swarm.get_xaxis()
    y_axis = pso_swarm.get_yaxis()
    data = (x_axis, y_axis, np.random.random(size=len(x_axis)))
    pop_scatter = hv.Scatter(data, vdims=['y_axis', 'z'])
    pop_scatter.opts(size=8, color='z', cmap='Coolwarm_r')
    return pop_scatter

# On event function for update button click to update the number of particles in both the swarms
def computational_update():
    x_axis = pso_computation_swarm.get_xaxis()
    y_axis = pso_computation_swarm.get_yaxis()
    data = (x_axis, y_axis, np.random.random(size=len(x_axis)))
    pop_scatter1 = hv.Scatter(data, vdims=['y_axis', 'z'])
    pop_scatter1.opts(size=8, color='z', cmap='Coolwarm_r')
    return pop_scatter1

# On event function for update button click to update the number of particles in both the swarms
def update_num_particles_event(event):
    if population_slider.value == pso_swarm.get_num_particles():
        return
    pso_swarm.terminate()
    pso_computation_swarm.terminate()
    time.sleep(1)
    updated_num_particles = population_slider.value
    pso_swarm.update_particles_position_lists(updated_num_particles)
    pso_swarm.update_num_particles(updated_num_particles)
    pso_computation_swarm.update_num_particles(updated_num_particles)
    pso_computation_swarm.update_particles_position_lists_with_random_values()
    pso_swarm.update_particles_position_lists_with_random_values()  # Update positions for pso_swarm as well
    hv.streams.Stream.trigger(pso_scatter1.streams)
    hv.streams.Stream.trigger(pso_scatter.streams)
    
# Periodic Callback function for every 3 seconds to stream the data to dynamic maps
def trigger_streams():
    global update_table
    hv.streams.Stream.trigger(pso_scatter.streams)
    hv.streams.Stream.trigger(pso_scatter1.streams)
    if update_table:
        update_table = False
        hv.streams.Stream.trigger(table_dmap.streams)

    # Update the target position
    tap.event(x=pso_swarm.get_target()[0], y=pso_swarm.get_target()[1])

    # Slow down the swarm's speed
    time.sleep(0.05)  # Adjust the delay as needed

# On event function for begin the hunting button click to start hunting for the target
def hunting_button_event(event):
    if not pso_swarm.isrunning():
        pso_swarm.starting()
        threading.Thread(target=start_finding_the_target).start()

# On event function for start the computation button click to start computation for a mathematical function
def computation_button_event(event):
    if not pso_computation_swarm.isrunning():
        pso_computation_swarm.starting()
        threading.Thread(target=start_computation).start()

def table():
    position = pso_computation_swarm.global_best_position
    df = pd.DataFrame({
        'x_position': [round(position[0])],
        'y_position': [round(position[1])]
    })

    # Create an hv.Table with the data
    hv_table = hv.Table(df).opts(width=300, height=100)

    return hv_table


# Function to update the mathematical function for which swarm finds the optimal solution
def update_function(event):
    pso_computation_swarm.terminate()
    time.sleep(1)
    pso_computation_swarm.update_particles_position_lists_with_random_values()

    
# Two dynamic maps for two interactive PSOs, one for finding a target and one for computation of a mathematical function
pso_scatter = hv.DynamicMap(update, streams=[Stream.define('Next')()]).opts(xlim=(-500, 500), ylim=(-500, 500),
                                                                             title="Plot 2 : PSO for target finding ")
pso_scatter1 = hv.DynamicMap(computational_update, streams=[Stream.define('Next')()]).opts(xlim=(-500, 500),
                                                                                            ylim=(-500, 500),
                                                                                            title="Plot 1 : PSO for a mathematical computation")

# Dynamic map to update and display target
tap = hv.streams.SingleTap(x=pso_swarm.get_target()[0], y=pso_swarm.get_target()[1])
target_dmap = hv.DynamicMap(create_target_element, streams=[tap])

# Define custom CSS styles for the table container
custom_style = {
    'background': '##4287f5',  # Background color
    'border': '1px solid black',  # Border around the table
    'padding': '8px',  # Padding inside the container
    'box-shadow': '5px 5px 5px #bcbcbc'  # Box shadow for a 3D effect
}

# Dynamic map to update the table with continuous global best position of the swarm
table_dmap = hv.DynamicMap(table,streams=[hv.streams.Stream.define('Next')()])
table_label = pn.pane.Markdown("Once an optimal solution is found in plot 1 it is updated in the below table")

# Button to order the swarm of particles to start finding the target
start_hunting_button = pn.widgets.Button(name=' Click to find target for plot 2 ', width=50)
start_hunting_button.on_click(hunting_button_event)

# Button to order the swarm of particles to start computation for selected mathematical function
start_finding_button = pn.widgets.Button(name=' Click to start computation for plot 1', width=50)
start_finding_button.on_click(computation_button_event)

# Button to update number of particles 
update_num_particles_button = pn.widgets.Button(name='Update number of particles', width=50)
update_num_particles_button.on_click(update_num_particles_event)

# periodic callback for every three seconds to trigger streams method
pn.state.add_periodic_callback(trigger_streams, 3)

# Slider to change the number of particles
population_slider = pn.widgets.IntSlider(name='Number of praticles', start=10, end=100, value=25)

# Dropdown list to select a mathematical function
function_select = pn.widgets.Select(name='Select', options=['x^2+(y-100)^2','(x-234)^2+(y+100)^2', 'x^3 + y^3 - 3*x*y', 'x^2 * y^2'])
function_select.param.watch(update_function,'value')

#combining the dynamic maps with particles and target into one dynamicmap
plot_for_finding_the_target = pso_scatter*target_dmap    
    
# Building the layout and returning the dashboard     
dashboard =  pn.Column(pn.Row(pn.Row(pso_scatter1.opts(width=500, height=500)), pn.Column(plot_for_finding_the_target.opts(width=500, height=500)),
                             pn.Column(pn.Column(table_label, table_dmap, styles=custom_style), start_finding_button, start_hunting_button, update_num_particles_button, population_slider,function_select)))

pn.panel(dashboard).servable(title='Swarm Particles Visualization')