-
Notifications
You must be signed in to change notification settings - Fork 23
/
model.py
371 lines (328 loc) · 15 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
from graph import TrafficNetwork, Graph
import numpy as np
class TrafficFlowModel:
''' TRAFFIC FLOW ASSIGN MODEL
Inside the Frank-Wolfe algorithm is given, one can use
the method `solve` to compute the numerical solution of
User Equilibrium problem.
'''
def __init__(self, graph= None, origins= [], destinations= [],
demands= [], link_free_time= None, link_capacity= None):
self.__network = TrafficNetwork(graph= graph, O= origins, D= destinations)
# Initialization of parameters
self.__link_free_time = np.array(link_free_time)
self.__link_capacity = np.array(link_capacity)
self.__demand = np.array(demands)
# Alpha and beta (used in performance function)
self._alpha = 0.15
self._beta = 4
# Convergent criterion
self._conv_accuracy = 1e-5
# Boolean varible: If true print the detail while iterations
self.__detail = False
# Boolean varible: If true the model is solved properly
self.__solved = False
# Some variables for contemporarily storing the
# computation result
self.__final_link_flow = None
self.__iterations_times = None
def __insert_links_in_order(self, links):
''' Insert the links as the expected order into the
data structure `TrafficFlowModel.__network`
'''
first_vertice = [link[0] for link in links]
for vertex in first_vertice:
self.__network.add_vertex(vertex)
for link in links:
self.__network.add_edge(link)
def solve(self):
''' Solve the traffic flow assignment model (user equilibrium)
by Frank-Wolfe algorithm, all the necessary data must be
properly input into the model in advance.
(Implicitly) Return
------
self.__solved = True
'''
if self.__detail:
print(self.__dash_line())
print("TRAFFIC FLOW ASSIGN MODEL (USER EQUILIBRIUM) \nFRANK-WOLFE ALGORITHM - DETAIL OF ITERATIONS")
print(self.__dash_line())
print(self.__dash_line())
print("Initialization")
print(self.__dash_line())
# Step 0: based on the x0, generate the x1
empty_flow = np.zeros(self.__network.num_of_links())
link_flow = self.__all_or_nothing_assign(empty_flow)
counter = 0
while True:
if self.__detail:
print(self.__dash_line())
print("Iteration %s" % counter)
print(self.__dash_line())
print("Current link flow:\n%s" % link_flow)
# Step 1 & Step 2: Use the link flow matrix -x to generate the time, then generate the auxiliary link flow matrix -y
auxiliary_link_flow = self.__all_or_nothing_assign(link_flow)
# Step 3: Linear Search
opt_theta = self.__golden_section(link_flow, auxiliary_link_flow)
# Step 4: Using optimal theta to update the link flow matrix
new_link_flow = (1 - opt_theta) * link_flow + opt_theta * auxiliary_link_flow
# Print the detail if necessary
if self.__detail:
print("Optimal theta: %.8f" % opt_theta)
print("Auxiliary link flow:\n%s" % auxiliary_link_flow)
# Step 5: Check the Convergence, if FALSE, then return to Step 1
if self.__is_convergent(link_flow, new_link_flow):
if self.__detail:
print(self.__dash_line())
self.__solved = True
self.__final_link_flow = new_link_flow
self.__iterations_times = counter
break
else:
link_flow = new_link_flow
counter += 1
def _formatted_solution(self):
''' According to the link flow we obtained in `solve`,
generate a tuple which contains four elements:
`link flow`, `link travel time`, `path travel time` and
`link vehicle capacity ratio`. This function is exposed
to users in case they need to do some extensions based
on the computation result.
'''
if self.__solved:
link_flow = self.__final_link_flow
link_time = self.__link_flow_to_link_time(link_flow)
path_time = self.__link_time_to_path_time(link_time)
link_vc = link_flow / self.__link_capacity
return link_flow, link_time, path_time, link_vc
else:
return None
def report(self):
''' Generate the report of the result in console,
this function can be invoked only after the
model is solved.
'''
if self.__solved:
# Print the input of the model
print(self)
# Print the report
# Do the computation
link_flow, link_time, path_time, link_vc = self._formatted_solution()
print(self.__dash_line())
print("TRAFFIC FLOW ASSIGN MODEL (USER EQUILIBRIUM) \nFRANK-WOLFE ALGORITHM - REPORT OF SOLUTION")
print(self.__dash_line())
print(self.__dash_line())
print("TIMES OF ITERATION : %d" % self.__iterations_times)
print(self.__dash_line())
print(self.__dash_line())
print("PERFORMANCE OF LINKS")
print(self.__dash_line())
for i in range(self.__network.num_of_links()):
print("%2d : link= %12s, flow= %8.2f, time= %8.3f, v/c= %.3f" % (i, self.__network.edges()[i], link_flow[i], link_time[i], link_vc[i]))
print(self.__dash_line())
print("PERFORMANCE OF PATHS (GROUP BY ORIGIN-DESTINATION PAIR)")
print(self.__dash_line())
counter = 0
for i in range(self.__network.num_of_paths()):
if counter < self.__network.paths_category()[i]:
counter = counter + 1
print(self.__dash_line())
print("%2d : group= %2d, time= %8.3f, path= %s" % (i, self.__network.paths_category()[i], path_time[i], self.__network.paths()[i]))
print(self.__dash_line())
else:
raise ValueError("The report could be generated only after the model is solved!")
def __all_or_nothing_assign(self, link_flow):
''' Perform the all-or-nothing assignment of
Frank-Wolfe algorithm in the User Equilibrium
Traffic Assignment Model.
This assignment aims to assign all the traffic
flow, within given origin and destination, into
the least time consuming path
Input: link flow -> Output: new link flow
The input is an array.
'''
# LINK FLOW -> LINK TIME
link_time = self.__link_flow_to_link_time(link_flow)
# LINK TIME -> PATH TIME
path_time = self.__link_time_to_path_time(link_time)
# PATH TIME -> PATH FLOW
# Find the minimal traveling time within group
# (splited by origin - destination pairs) and
# assign all the flow to that path
path_flow = np.zeros(self.__network.num_of_paths())
for OD_pair_index in range(self.__network.num_of_OD_pairs()):
indice_grouped = []
for path_index in range(self.__network.num_of_paths()):
if self.__network.paths_category()[path_index] == OD_pair_index:
indice_grouped.append(path_index)
sub_path_time = [path_time[ind] for ind in indice_grouped]
min_in_group = min(sub_path_time)
ind_min = sub_path_time.index(min_in_group)
target_path_ind = indice_grouped[ind_min]
path_flow[target_path_ind] = self.__demand[OD_pair_index]
if self.__detail:
print("Link time:\n%s" % link_time)
print("Path flow:\n%s" % path_flow)
print("Path time:\n%s" % path_time)
# PATH FLOW -> LINK FLOW
new_link_flow = self.__path_flow_to_link_flow(path_flow)
return new_link_flow
def __link_flow_to_link_time(self, link_flow):
''' Based on current link flow, use link
time performance function to compute the link
traveling time.
The input is an array.
'''
n_links = self.__network.num_of_links()
link_time = np.zeros(n_links)
for i in range(n_links):
link_time[i] = self.__link_time_performance(link_flow[i], self.__link_free_time[i], self.__link_capacity[i])
return link_time
def __link_time_to_path_time(self, link_time):
''' Based on current link traveling time,
use link-path incidence matrix to compute
the path traveling time.
The input is an array.
'''
path_time = link_time.dot(self.__network.LP_matrix())
return path_time
def __path_flow_to_link_flow(self, path_flow):
''' Based on current path flow, use link-path incidence
matrix to compute the traffic flow on each link.
The input is an array.
'''
link_flow = self.__network.LP_matrix().dot(path_flow)
return link_flow
def _get_path_free_time(self):
''' Only used in the final evaluation, not the recursive structure
'''
path_free_time = self.__link_free_time.dot(self.__network.LP_matrix())
return path_free_time
def __link_time_performance(self, link_flow, t0, capacity):
''' Performance function, which indicates the relationship
between flows (traffic volume) and travel time on
the same link. According to the suggestion from Federal
Highway Administration (FHWA) of America, we could use
the following function:
t = t0 * (1 + alpha * (flow / capacity))^beta
'''
value = t0 * (1 + self._alpha * ((link_flow/capacity)**self._beta))
return value
def __link_time_performance_integrated(self, link_flow, t0, capacity):
''' The integrated (with repsect to link flow) form of
aforementioned performance function.
'''
val1 = t0 * link_flow
# Some optimization should be implemented for avoiding overflow
val2 = (self._alpha * t0 * link_flow / (self._beta + 1)) * (link_flow / capacity)**self._beta
value = val1 + val2
return value
def __object_function(self, mixed_flow):
''' Objective function in the linear search step
of the optimization model of user equilibrium
traffic assignment problem, the only variable
is mixed_flow in this case.
'''
val = 0
for i in range(self.__network.num_of_links()):
val += self.__link_time_performance_integrated(link_flow= mixed_flow[i], t0= self.__link_free_time[i], capacity= self.__link_capacity[i])
return val
def __golden_section(self, link_flow, auxiliary_link_flow, accuracy= 1e-8):
''' The golden-section search is a technique for
finding the extremum of a strictly unimodal
function by successively narrowing the range
of values inside which the extremum is known
to exist. The accuracy is suggested to be set
as 1e-8. For more details please refer to:
https://en.wikipedia.org/wiki/Golden-section_search
'''
# Initial params, notice that in our case the
# optimal theta must be in the interval [0, 1]
LB = 0
UB = 1
goldenPoint = 0.618
leftX = LB + (1 - goldenPoint) * (UB - LB)
rightX = LB + goldenPoint * (UB - LB)
while True:
val_left = self.__object_function((1 - leftX) * link_flow + leftX * auxiliary_link_flow)
val_right = self.__object_function((1 - rightX) * link_flow + rightX * auxiliary_link_flow)
if val_left <= val_right:
UB = rightX
else:
LB = leftX
if abs(LB - UB) < accuracy:
opt_theta = (rightX + leftX) / 2.0
return opt_theta
else:
if val_left <= val_right:
rightX = leftX
leftX = LB + (1 - goldenPoint) * (UB - LB)
else:
leftX = rightX
rightX = LB + goldenPoint*(UB - LB)
def __is_convergent(self, flow1, flow2):
''' Regard those two link flows lists as the point
in Euclidean space R^n, then judge the convergence
under given accuracy criterion.
Here the formula
ERR = || x_{k+1} - x_{k} || / || x_{k} ||
is recommended.
'''
err = np.linalg.norm(flow1 - flow2) / np.linalg.norm(flow1)
if self.__detail:
print("ERR: %.8f" % err)
if err < self._conv_accuracy:
return True
else:
return False
def disp_detail(self):
''' Display all the numerical details of each variable
during the iteritions.
'''
self.__detail = True
def set_disp_precision(self, precision):
''' Set the precision of display, which influences only
the digit of numerical component in arrays.
'''
np.set_printoptions(precision= precision)
def __dash_line(self):
''' Return a string which consistently
contains '-' with fixed length
'''
return "-" * 80
def __str__(self):
string = ""
string += self.__dash_line()
string += "\n"
string += "TRAFFIC FLOW ASSIGN MODEL (USER EQUILIBRIUM) \nFRANK-WOLFE ALGORITHM - PARAMS OF MODEL"
string += "\n"
string += self.__dash_line()
string += "\n"
string += self.__dash_line()
string += "\n"
string += "LINK Information:\n"
string += self.__dash_line()
string += "\n"
for i in range(self.__network.num_of_links()):
string += "%2d : link= %s, free time= %.2f, capacity= %s \n" % (i, self.__network.edges()[i], self.__link_free_time[i], self.__link_capacity[i])
string += self.__dash_line()
string += "\n"
string += "OD Pairs Information:\n"
string += self.__dash_line()
string += "\n"
for i in range(self.__network.num_of_OD_pairs()):
string += "%2d : OD pair= %s, demand= %d \n" % (i, self.__network.OD_pairs()[i], self.__demand[i])
string += self.__dash_line()
string += "\n"
string += "Path Information:\n"
string += self.__dash_line()
string += "\n"
for i in range(self.__network.num_of_paths()):
string += "%2d : Conjugated OD pair= %s, Path= %s \n" % (i, self.__network.paths_category()[i], self.__network.paths()[i])
string += self.__dash_line()
string += "\n"
string += f"Link-Path Incidence Matrix (Rank: {self.__network.LP_matrix_rank()}):\n"
string += self.__dash_line()
string += "\n"
string += str(self.__network.LP_matrix())
return string