forked from meraki/automation-scripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
import_mx_l3.py
150 lines (126 loc) · 5.21 KB
/
import_mx_l3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/python3
READ_ME = '''
=== PREREQUISITES ===
Run in Python 3
Install both requests & Meraki Dashboard API Python modules:
pip[3] install --upgrade requests
pip[3] install --upgrade meraki
=== DESCRIPTION ===
Imports CSV of MX L3 firewall rules into Dashboard appliance/combined network.
Note that if there is a final "default rule" with logging enabled, then a
syslog server needs to be configured on the Network-wide > General page.
=== USAGE ===
python import_mx_l3.py -k <api_key> -n <net_id> -f <file> [-m <mode>]
The -f parameter is the path to the CSV file with MX L3 firewall rules.
The optional -m parameter is either "simulate" (default) to only print changes,
or "commit" to also apply those changes to Dashboard.
'''
import csv
from datetime import datetime
import getopt
import logging
import sys
from meraki import meraki
# Prints READ_ME help message for user to read
def print_help():
lines = READ_ME.split('\n')
for line in lines:
print('# {0}'.format(line))
logger = logging.getLogger(__name__)
def configure_logging():
logging.basicConfig(
filename='{}_log_{:%Y%m%d_%H%M%S}.txt'.format(sys.argv[0].split('.')[0], datetime.now()),
level=logging.DEBUG,
format='%(asctime)s: %(levelname)7s: [%(name)s]: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def main(argv):
# Set default values for command line arguments
api_key = net_id = arg_file = arg_mode = None
# Get command line arguments
try:
opts, args = getopt.getopt(argv, 'hk:n:f:m:')
except getopt.GetoptError:
print_help()
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print_help()
sys.exit()
elif opt == '-k':
api_key = arg
elif opt == '-n':
net_id = arg
elif opt == '-f':
arg_file = arg
elif opt == '-m':
arg_mode = arg
# Check if all required parameters have been input
if api_key == None or net_id == None or arg_file == None:
print_help()
sys.exit(2)
# Assign default mode to "simulate" unless "commit" specified
if arg_mode != 'commit':
arg_mode = 'simulate'
# Read CSV input file, and skip header row
input_file = open(arg_file)
csv_reader = csv.reader(input_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
next(csv_reader, None)
logger.info('Reading file {0}'.format(arg_file))
# Loop through each firewall rule from CSV file and build PUT data
fw_rules = []
for row in csv_reader:
rule = dict({'policy': row[0], 'protocol': row[1], 'srcCidr': row[2], 'srcPort': row[3], 'destCidr': row[4], 'destPort': row[5], 'comment': row[6], 'syslogEnabled': (row[7] == True or row[7] == 'True' or row[7] == 'true')})
fw_rules.append(rule)
old_rules = list(fw_rules)
logger.info('Processed all {0} rules of file {1}'.format(len(fw_rules), arg_file))
# Check if last (default) rule exists, and if so, remove and check for default logging
default_rule_exists = False
default_logging = False
last_rule = {'comment': 'Default rule', 'policy': 'allow', 'protocol': 'Any', 'srcPort': 'Any', 'srcCidr': 'Any', 'destPort': 'Any', 'destCidr': 'Any'}
if all(item in fw_rules[-1].items() for item in last_rule.items()):
default_rule_exists = True
default_logging = (fw_rules.pop()['syslogEnabled'] == True)
# Update MX L3 firewall rules
if arg_mode == 'commit':
meraki.updatemxl3fwrules(api_key, net_id, fw_rules, default_logging)
logger.info('Attempting update of firewall rules to network {0}'.format(net_id))
# Confirm whether changes were successfully made
new_rules = meraki.getmxl3fwrules(api_key, net_id)
if default_rule_exists and new_rules[:-1] == old_rules[:-1]:
logger.info('Update successful!')
elif not(default_rule_exists) and new_rules[:-1] == old_rules:
logger.info('Update successful!')
else:
logger.error('Uh oh, something went wrong...')
else:
logger.info('Simulating update of firewall rules to network {0}'.format(net_id))
if __name__ == '__main__':
# Configure logging to stdout
configure_logging()
# Define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# Set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# Tell the handler to use this format
console.setFormatter(formatter)
# Add the handler to the root logger
logging.getLogger('').addHandler(console)
# Output to logfile/console starting inputs
start_time = datetime.now()
logger.info('Started script at {0}'.format(start_time))
inputs = sys.argv[1:]
try:
key_index = inputs.index('-k')
except ValueError:
print_help()
sys.exit(2)
inputs.pop(key_index+1)
inputs.pop(key_index)
logger.info('Input parameters: {0}'.format(inputs))
main(sys.argv[1:])
# Finish output to logfile/console
end_time = datetime.now()
logger.info('Ended script at {0}'.format(end_time))
logger.info(f'Total run time = {end_time - start_time}')