-
Notifications
You must be signed in to change notification settings - Fork 176
/
action.py
176 lines (145 loc) · 6.2 KB
/
action.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
Geneva superclass object for defining a packet-level action.
"""
import inspect
import importlib
import os
import sys
import actions.utils
ACTION_CACHE = {}
ACTION_CACHE["in"] = {}
ACTION_CACHE["out"] = {}
BASEPATH = os.path.sep.join(os.path.dirname(os.path.abspath(__file__)).split(os.path.sep)[:-1])
class Action():
"""
Defines the superclass for a Geneva Action.
"""
# Give each Action a unique ID - this is needed for graphing/visualization
ident = 0
# Each Action has a 'frequency' field - this defines how likely it is to be chosen
# when a new action is chosen
frequency = 0
def __init__(self, action_name, direction):
"""
Initializes this action object.
Args:
action_name (str): Name of this action ("duplicate")
direction (str): Direction of this action ("out", "both", "in")
"""
self.enabled = True
self.action_name = action_name
self.direction = direction
self.requires_undo = False
self.num_seen = 0
self.left = None
self.right = None
self.branching = False
self.terminal = False
self.ident = Action.ident
Action.ident += 1
def applies(self, direction):
"""
Returns whether this action applies to the given direction, as
branching actions are not supported on inbound trees.
Args:
direction (str): Direction to check if this action applies ("out", "in", "both")
Returns:
bool: whether or not this action can be used to a given direction
"""
if direction == self.direction or self.direction == "both":
return True
return False
def mutate(self, environment_id=None):
"""
Mutates packet.
"""
def __str__(self):
"""
Defines string representation of this action.
"""
return "%s" % (self.action_name)
@staticmethod
def get_actions(direction, disabled=None, allow_terminal=True):
"""
Dynamically imports all of the Action classes in this directory.
Will only return terminal actions if terminal is set to True.
Args:
direction (str): Limit imported actions to just those that can run to this direction ("out", "in", "both")
disabled (list, optional): list of actions that are disabled
allow_terminal (bool): whether or not terminal actions ("drop") should be imported
Returns:
dict: Dictionary of imported actions
"""
if disabled is None:
disabled = []
# Recursively call this function again to enumerate in and out actions
if direction.lower() == "both":
return list(set(Action.get_actions("in", disabled=disabled, allow_terminal=allow_terminal) + \
Action.get_actions("out", disabled=disabled, allow_terminal=allow_terminal)))
terminal = "terminal"
if not allow_terminal:
terminal = "non-terminal"
if terminal not in ACTION_CACHE[direction]:
ACTION_CACHE[direction][terminal] = {}
else:
return ACTION_CACHE[direction][terminal]
collected_actions = []
# Get the base path for the project relative to this file
path = os.path.join(BASEPATH, "actions")
for action_file in os.listdir(path):
if not action_file.endswith(".py"):
continue
action = action_file.replace(".py", "")
if BASEPATH not in sys.path:
sys.path.append(BASEPATH)
importlib.import_module("actions." + action)
def check_action(obj):
return inspect.isclass(obj) and \
issubclass(obj, actions.action.Action) and \
obj != actions.action.Action and \
obj().applies(direction) and \
obj().enabled and \
not any([x in str(obj) for x in disabled]) and \
(allow_terminal or not obj().terminal)
clsmembers = inspect.getmembers(sys.modules["actions."+action], predicate=check_action)
collected_actions += clsmembers
collected_actions = list(set(collected_actions))
ACTION_CACHE[direction][terminal] = collected_actions
return collected_actions
@staticmethod
def parse_action(str_action, direction, logger):
"""
Parses a string action into the action object.
Args:
str_action (str): String representation of an action to parse
direction (str): Limit actions searched through to just those that can run to this direction ("out", "in", "both")
logger (:obj:`logging.Logger`): a logger to log with
Returns:
:obj:`action.Action`: A parsed action object
"""
# Collect all viable actions that can run for each respective direction
outs = Action.get_actions("out")
ins = Action.get_actions("in")
# If we're currently parsing the OUT forest, only search the out-compatible actions
if direction == "out":
search = outs
# Otherwise only search in-compatible actions (no branching)
else:
search = ins
action_obj = None
data = None
# If this action has parameters (defined within {} attached to the action),
# split off the data parameters from the raw action name
if "{" in str_action:
str_action, data = str_action.split("{")
data = data.replace("}", "")
# Search through all of the actions available for this direction to find the right class
for action_name, action_cls in search:
if str_action.strip() and str_action.lower() in action_name.lower():
# Define the action, and give it a reference to its parent strategy
action_obj = action_cls()
# If this action has data, ask the new module to parse & initialize itself to it
if data:
# Pass our logger to the action to alert us if it can't parse something
action_obj.parse(data, logger)
return action_obj