forked from ucla-mobility/OpenCDA
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cav_world.py
142 lines (112 loc) · 3.86 KB
/
cav_world.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# -*- coding: utf-8 -*-
# Author: Runsheng Xu <[email protected]>
# License: TDG-Attribution-NonCommercial-NoDistrib
import importlib
class CavWorld(object):
"""
A customized world object to save all CDA vehicle
information and shared ML models. During co-simulation,
it is also used to save the sumo-carla id mapping.
Parameters
----------
apply_ml : bool
Whether apply ml/dl models in this simulation, please make sure
you have install torch/sklearn before setting this to True.
Attributes
----------
vehicle_id_set : set
A set that stores vehicle IDs.
_vehicle_manager_dict : dict
A dictionary that stores vehicle managers.
_platooning_dict : dict
A dictionary that stores platooning managers.
_rsu_manager_dict : dict
A dictionary that stores RSU managers.
ml_manager : opencda object.
The machine learning manager class.
"""
def __init__(self, apply_ml=False):
self.vehicle_id_set = set()
self._vehicle_manager_dict = {}
self._platooning_dict = {}
self._rsu_manager_dict = {}
self.ml_manager = None
if apply_ml:
# we import in this way so the user don't need to install ml
# packages unless they require to
ml_manager = getattr(importlib.import_module(
"opencda.customize.ml_libs.ml_manager"), 'MLManager')
# initialize the ml manager to load the DL/ML models into memory
self.ml_manager = ml_manager()
# this is used only when co-simulation activated.
self.sumo2carla_ids = {}
def update_vehicle_manager(self, vehicle_manager):
"""
Update created CAV manager to the world.
Parameters
----------
vehicle_manager : opencda object
The vehicle manager class.
"""
self.vehicle_id_set.add(vehicle_manager.vehicle.id)
self._vehicle_manager_dict.update(
{vehicle_manager.vid: vehicle_manager})
def update_platooning(self, platooning_manger):
"""
Add created platooning.
Parameters
----------
platooning_manger : opencda object
The platooning manager class.
"""
self._platooning_dict.update(
{platooning_manger.pmid: platooning_manger})
def update_rsu_manager(self, rsu_manager):
"""
Add rsu manager.
Parameters
----------
rsu_manager : opencda object
The RSU manager class.
"""
self._rsu_manager_dict.update({rsu_manager.rid: rsu_manager})
def update_sumo_vehicles(self, sumo2carla_ids):
"""
Update the sumo carla mapping dict. This is only called
when cosimulation is conducted.
Parameters
----------
sumo2carla_ids : dict
Key is sumo id and value is carla id.
"""
self.sumo2carla_ids = sumo2carla_ids
def get_vehicle_managers(self):
"""
Return vehicle manager dictionary.
"""
return self._vehicle_manager_dict
def get_platoon_dict(self):
"""
Return existing platoons.
"""
return self._platooning_dict
def locate_vehicle_manager(self, loc):
"""
Locate the vehicle manager based on the given location.
Parameters
----------
loc : carla.Location
Vehicle location.
Returns
-------
target_vm : opencda object
The vehicle manager at the give location.
"""
target_vm = None
for key, vm in self._vehicle_manager_dict.items():
x = vm.localizer.get_ego_pos().location.x
y = vm.localizer.get_ego_pos().location.y
if loc.x == x and loc.y == y:
target_vm = vm
break
return target_vm