Spaces:
Sleeping
Sleeping
File size: 4,994 Bytes
719d0db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
import numpy as np
from scipy.spatial import distance
from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp
class ORToolsBase():
def __init__(self, large_value=1e+6, scaling=False):
self.coord_dim = 2
self.large_value = large_value
self.scaling = scaling
def preprocess_data(self, node_feats, dist_matrix=None):
if self.scaling:
node_feats = self.scaling_feats(node_feats)
data = {}
# convert set of corrdinates to a distance matrix
if dist_matrix is None:
coords = node_feats["coords"]
_dist_matrix = distance.cdist(coords, coords, 'euclidean').astype(np.int64)
else:
_dist_matrix = dist_matrix
if self.scaling:
_dist_matrix *= self.large_value
data['distance_matrix'] = _dist_matrix.tolist()
data['num_vehicles'] = 1
data['depot'] = 0
if "service_time" in node_feats:
data["service_time"] = node_feats["service_time"].astype(np.int64).tolist()
else:
data["service_time"] = np.zeros(len(data['distance_matrix']), dtype=np.int64).tolist()
return node_feats, data
def scaling_feats(self, node_feats):
raise NotImplementedError
def solve(self, node_feats, fixed_path=None, dist_matrix=None, instance_name=None):
"""
Paramters
---------
node_feats: np.array [num_nodes x node_dim]
the first (coord_dim) dims are coord_dim
fixed_path:
Returns
-------
tour: np.array [seq_length]
"""
node_feats, data = self.preprocess_data(node_feats, dist_matrix)
# Create the routing index manager.
manager = pywrapcp.RoutingIndexManager(
len(data['distance_matrix']), data['num_vehicles'], data['depot'])
# Create Routing Model.
routing = pywrapcp.RoutingModel(manager)
def distance_callback(from_index, to_index):
"""Returns the distance between the two nodes."""
# Convert from routing variable Index to distance matrix NodeIndex.
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return data['distance_matrix'][from_node][to_node] + data["service_time"][from_node]
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
# Define cost of each arc.
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
self.add_constraints(routing, transit_callback_index, manager, data, node_feats)
# fix a partial path
if fixed_path is not None:
fixed_path = self.index2node(fixed_path, manager)
print(fixed_path)
routing.CloseModel() # <- very important. this should be called at last
# ApplyLocks supports only single vehicle
# routing.ApplyLocks(fixed_path)
# As ApplyLocksToAllVehicles does not contain depot,
# remove depot id from fixed_path (first element of fixed_path is alwawys depot_id)
routing.ApplyLocksToAllVehicles([fixed_path[1:]], False)
# Setting first solution heuristic.
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC)
search_parameters.local_search_metaheuristic = (
routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH)
search_parameters.time_limit.seconds = 5# int(1 * 60) #120
# Solve the problem.
assignment = routing.SolveWithParameters(search_parameters)
if assignment is None:
return
# assert assignment is not None, "Found no solution."
return self.get_tour_list(data, manager, routing, assignment)
def index2node(self, path, manager):
mod_path = []
for i in range(len(path)):
mod_path.append(manager.IndexToNode(path[i].item(0)))
return mod_path
def get_tour_list(self, data, manager, routing, solution):
"""
Returns
-------
tour: 2d list [num_vehicles x seq_length]
"""
num_vehicles = data["num_vehicles"]
tour = [[] for _ in range(num_vehicles)]
for vehicle_id in range(num_vehicles):
index = routing.Start(vehicle_id)
while not routing.IsEnd(index):
tour[vehicle_id].append(manager.IndexToNode(index))
index = solution.Value(routing.NextVar(index))
tour[vehicle_id].append(manager.IndexToNode(index))
# remove unused vehicles
tour = [vehicle_tour for vehicle_tour in tour if len(vehicle_tour) > 2]
return tour
def add_constraints(self, routing, transit_callback_index, manager, data, node_feats):
pass |