Solving cargo delivery problem using python

Asad Ali
9 min readDec 26, 2022
Photo by Wolfgang Hasselmann on Unsplash

Routing problems are one of the most essential computer science optimization problems used everywhere, from transportation, cargo delivery, robotics, Roomba robot cleaners, circuit manufacturers, etc.

Although there are various open source tools available such as Google-OR tools etc., in this article, we will build our optimizer from scratch based on custom solution requirements. This solution is implemented in Python 3 but can be translated into any language.

We will follow the following stages to build the solution

  1. Define the problem: In this case, you are trying to find the most efficient route for delivery vehicles to follow in order to deliver goods or services to a set of customers.
  2. Create a graph representation of the problem: You can do this by creating a graph object that consists of a set of nodes (representing the delivery points) and a set of edges (representing the roads or routes between the points).
  3. Determine the cost of travel between delivery points: You can create a cost matrix representing the cost of travel between each pair of delivery points. This is based on your custom requirement and the cost can represent various kinds of costs, distance or time-based.
  4. Choose an optimization algorithm: There are many different algorithms that can be used to solve routing problems, such as breadth-first search, Dijkstra’s algorithm, or a genetic algorithm. You will need to choose the one that best fits the needs of your problem.
  5. Implement the algorithm: Once you have chosen an algorithm, you will need to implement it in code in order to solve the problem
  6. Test and validate the solution: You will need to test your routing optimizer on a variety of different inputs to ensure that it is able to find the most efficient routes. You may also want to validate the solution by comparing it to known optimal routes or by benchmarking it against other optimization algorithms.
  7. Refine and improve the solution: You may find that your initial solution is not as efficient as you would like. In this case, you can try adjusting the parameters of your optimization algorithm or implementing a different algorithm in order to improve the solution.

Let's create a graph object using python default dict API, we can also use networkx or any other equivalent library

class Graph:
"""
Creates a direction graph with cost and a given number of nodes and edges
"""
def init(self):
self.nodes = []
self.edges = []
self.graph = defaultdict(list)

def __repr__(self) -> str:
return f"Graph with {len(self.nodes)} nodes and {len(self.edges)} edges"

def add_edge(self, u, v,cost):
self.nodes.append(u)
self.edges.append((u,v,cost))
self.graph[u].append(v)

def plot_graph(self):
G = nx.DiGraph()
G.add_nodes_from(self.nodes)
G.add_weighted_edges_from(self.edges)
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True)
labels = nx.get_edge_attributes(G,'weight')
nx.draw_networkx_edge_labels(G,pos,edge_labels=labels)
plt.show()

def __len__(self):
return len(self.nodes)

def __getitem__(self, node):
return self.graph[node]
def __iter__(self):
return iter(self.graph)
def __contains__(self, node):
return node in self.graph
def __setitem__(self, node, value):
self.graph[node] = value

For the first algorithm, I will simply take random permutations of the destination nodes and use a greedy approach. So lets create a simple permute function that just shuffles out the list of nodes

def random_permutation(list):
#remove first and last element from list and add after the list is shuffled
first = list.pop(0)
last = list.pop(-1)
#shuffle the list
random.shuffle(list)
#add first and last element back to the list
list.insert(0,first)
list.append(last)
return list

Now let's create a cost matrix; again, this will be based on your business case; in this scenario, I will just consider the fuel cost of transportation of cargo from one point to another let's say I have 12 stations to visit.

cost_matrix =  [[0, 2451, 713, 1018, 1631, 1374, 2408, 213, 2571, 875, 1420, 2145, 1972],
[2451, 0, 1745, 1524, 831, 1240, 959, 2596, 403, 1589, 1374, 357, 579],
[713, 1745, 0, 355, 920, 803, 1737, 851, 1858, 262, 940, 1453, 1260],
[1018, 1524, 355, 0, 700, 862, 1395, 1123, 1584, 466, 1056, 1280, 987],
[1631, 831, 920, 700, 0, 663, 1021, 1769, 949, 796, 879, 586, 371],
[1374, 1240, 803, 862, 663, 0, 1681, 1551, 1765, 547, 225, 887, 999],
[2408, 959, 1737, 1395, 1021, 1681, 0, 2493, 678, 1724, 1891, 1114, 701],
[213, 2596, 851, 1123, 1769, 1551, 2493, 0, 2699, 1038, 1605, 2300, 2099],
[2571, 403, 1858, 1584, 949, 1765, 678, 2699, 0, 1744, 1645, 653, 600],
[875, 1589, 262, 466, 796, 547, 1724, 1038, 1744, 0, 679, 1272, 1162],
[1420, 1374, 940, 1056, 879, 225, 1891, 1605, 1645, 679, 0, 1017, 1200],
[2145, 357, 1453, 1280, 586, 887, 1114, 2300, 653, 1272, 1017, 0, 504],
[1972, 579, 1260, 987, 371, 999, 701, 2099, 600, 1162, 1200, 504, 0]]

def create_graph(cost_matrix):
graph = Graph()
graph.init()
for i in range(len(cost_matrix)):
for j in range(len(cost_matrix)):
graph.add_edge(i,j,cost_matrix[i][j])
return graph

g = create_graph(cost_matrix)

Now let's use a simple breadth-first search to determine the possible paths from one city or node to another. Again the code below is self-explanatory

def possible_towns(graph,starting:int): 
"""
use BFS to do a search of possible towns
"""
seen_town = []
town_queue = [starting]
while town_queue:
town = town_queue.pop(0)
if town not in seen_town:
seen_town.append(town)
town_queue.extend(graph.graph[town])
#remove starting from seen_town
seen_town.remove(starting)
return seen_town

We can also use Dijkstra algorithms which are sort of like an extension of BFS as well to find closed paths. This time around, I will use networkx API, although we can also write our own Dijkstra by just modifying the above code a little bit.

def has_closed_path(G, source, destination):
# create a copy of the graph
H = G.copy()

# add an edge from the destination to the source with zero weight
H.add_edge(destination, source, weight=0)

# use Dijkstra's algorithm to find the shortest path from the source to the destination
try:
_, path_weight = nx.single_source_dijkstra(H, source, destination)
except nx.NetworkXNoPath:
# if there is no path from the source to the destination, return False
return False

# if the weight of the path from the source to the destination is non-zero, then a closed path exists
return path_weight > 0

Now let's write a function to calculate path cost, this is simply iterating in the list of edges getting the edge, and just adding up the cost

def cost_of_path(graph,path):
"""
given a path return the cost of the path
"""
cost = 0
for i in range(len(path)-1):
for edge in graph.edges:
if edge[0] == path[i] and edge[1] == path[i+1]:
cost += edge[2]
return cost

Now let's write our final two functions to write our first optimizer using greedy search. Obviously, this is not optimum, but a good start. To start my search, I will randomly generate multiple permutations and calculate the cost of the best route and keep going till I hit the number of trails limit.

def random_search_with_cost(graph,starting:int):
"""
given a graph and a starting point return a random path
"""

towns = possible_towns(graph,starting)
path = random_permutation(towns)
path.insert(0,starting)
path.append(starting)
#return cost of path

return cost_of_path(graph,path),path

def random_trails(n,graph,starting:int):
"""
given a number of trails and a graph return the best path
"""
best_path = []
best_cost = float('inf')
for i in range(n):
cost,path = random_search_with_cost(graph,starting)
if cost < best_cost:
best_cost = cost
best_path = path
return best_path,best_cost

Running the above will generally start to give good results after at least 10,000 iterations. To improve this algorithm, we can tinker a little bit with the way the algorithm selects the next best path on each iteration and the learning rate. All algorithms used in the real world for these sorts of problems perform some sort of optimization to limit the search space. e.g. evolutionary inspired ant-swarm optimization or genetic algorithms or simulated annealing, which is just a fancy word for converging learning rate while training.

Let's start with simulated annealing inspired by metal works and metallurgy. The main idea behind this approach is to use a higher learning rate at the start, probabilistically select new solutions, and as the algorithm reaches or gets to better paths in the search space to periodically decrease the learning rate using an error signal as a measure. So let's create the function of error and the learner. I will use something called as temperature, which is just a fancy word for denominator normalizer, to normalize the error, like how much I should change given the error in the feedback.

def compute_annealing_cost(error,temp):
import math
return math.exp(-error/temp)

def annealing(ntrails,graph,starting,possibletowns,ideal_cost=0,temp=100000):
towns = possibletowns
current_temp = temp
towns.insert(0,starting)
towns.append(starting)
best_towns = towns
best_solution = float('inf')
for _ in range(ntrails):
towns = random_permutation(towns)
error = cost_of_path(graph,towns) - ideal_cost
error_cost = compute_annealing_cost(error,current_temp)
if error_cost > random.random():
towns = best_towns

if error < best_solution:
best_solution = error
best_towns = towns
current_temp -= 0.5
return best_towns,best_solution

The above code runs slowly, on a larger machine with more cores, we can probably speed things up by using the python concurrency library and creating multiple threads. In case you are working on a larger solution of a much larger geography, we can divide your search space into multiple geographies and do a concurrent search. Executor submits API simply create a thread that runs the function with args on another core.

import concurrent.futures
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
future1 = executor.submit(annealing,100000,g,0,possibletowns)
future2 = executor.submit(annealing,100000,g,0,possibletowns)
path1 = future1.result()
path2 = future2.result()
print(path1)
print(path2)

Try using different values of temperature and temperature decrements to find the optimum one for your solution. Alternatively, we can also use genetic algorithms to solve our problem. Genetic algorithms are nature-inspired algorithms that follow the evolutionary cycle of crossover from two parents and then mutations. Basically, there are multiple ways of selecting parents based on different sub-algorithms as well as mutation methods, etc. However, in this article, I will simply take random parents from a generation of potential solutions and combine the first and last sections of the array of nodes in the path. Then for mutation, again, there are a ton of ways based on the problems to set up a mutation function, but in this example, I will simply use swapping as a mutation function, that is swapping the values of any two random indexes of the child. To set up the algorithms, we will create something called population, which is just a large sample of potential solutions and generation which is analogous to a number of total trials. Below is the function implementation, again this can be improved based on the specifics of the problems you are trying to solve.

def genetic_algorithm(generations, population_size,mutation_rate=0.2):

#given the number of generations and the population size return the best path

#create the initial population
population = []
for _ in range(population_size):
population.append(random_permutation(possibletowns))
#loop through the generations
for _ in range(generations):
#create the next generation
next_generation = []
#loop through the population
for _ in range(population_size):
#select two parents
parent1 = random.choice(population)

parent2 = random.choice(population)
#create a child
child = crossover(parent1,parent2)
#mutate the child
child = mutate(child,mutation_rate)
#add the child to the next generation only if it has unique cities
if len(set(child)) == len(child):
next_generation.append(child)
#keep only sequence with unique cities

#set the population to the next generation
population = next_generation
#get the best path
#select population with unique cities only

best_path = min(population,key=total_distance)
#return the best path
return best_path

#best_path = genetic_algorithm(generations,population_size)
#best_path.insert(0,0)
#best_path.append(0)
#print(best_path)
#print(total_distance(genetic_algorithm(generations,population_size)))

Some of the other options we can use include some heuristics-based approaches such as LIN-KERNIGHAN function. This is actually similar to our first solution using a random greedy search. It uses a divide and conquer kind of strategy to divide up solution space into multiple segments and then perform a search for the best path. Below is a simplified form

def divide_routes(route,i,j):
"""
given a route and two indices i and j return a new route with the cities swapped
"""
#get the first part of the route
first_part = route[:i]
#get the second part of the route
second_part = route[i:j]
#reverse the second part
second_part.reverse()
#get the third part of the route
third_part = route[j:]
#combine the three parts
new_route = first_part + second_part + third_part
#return the new route
return new_route


def optimizer(route):
"""
given a route return a new route with the cities swapped
"""
#get the length of the route
length = len(route)
#create a new route
new_route = route[:]
#loop through the route
for i in range(length-1):
#loop through the route
for j in range(i+1,length):
#get the new route
new_route = two_opt_swap(new_route,i,j)
#check if the new route is better than the old route
if total_distance(new_route) < total_distance(route):
#set the route to the new route
route = new_route
#return the route
return route

--

--

Asad Ali

Data Science, Analytics, and Machine Learning Professional.