-
Notifications
You must be signed in to change notification settings - Fork 5
/
helpers.py
338 lines (283 loc) · 11.3 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import asn1
import json
import machine
import pycom
import sys
import time
import ubinascii as binascii
import urequests as requests
from network import WLAN, LTE
from ubirch import SimProtocol
from uuid import UUID
LED_GREEN = 0x002200
LED_YELLOW = 0x222200
LED_ORANGE = 0x442200
LED_RED = 0x7f0000
LED_PURPLE = 0x220022
def set_led(led_color):
pycom.heartbeat(False)
pycom.rgbled(led_color)
def wake_up() -> float:
set_led(LED_GREEN)
return time.time()
def sleep_until_next_interval(start_time, interval):
# wait for next interval
sleep_time = interval - int(time.time() - start_time)
if sleep_time > 0:
print(">> sleep for {} seconds".format(sleep_time))
set_led(0) # LED off
machine.idle()
time.sleep(sleep_time)
def reset():
print("Resetting device...")
time.sleep(3)
machine.reset()
def nb_iot_attach(lte: LTE, apn: str):
sys.stdout.write(">> attaching to LTE network ({})".format(apn))
# since we disable unsolicited CEREG messages in reset_modem(), as they interfere with AT communication with the SIM via CSIM commands,
# we are required to use an attach method that does not require cereg messages, for pycom that is legacyattach=false
lte.attach(band=8, apn=apn,legacyattach=False)
i = 0
while not lte.isattached() and i < 60:
machine.idle() # save power while waiting
time.sleep(1.0)
sys.stdout.write(".")
i += 1
print("")
if not lte.isattached():
raise Exception("unable to attach to LTE network")
print("-- attached: " + str(i) + "s")
def nb_iot_connect(lte: LTE):
sys.stdout.write(">> connecting to LTE network")
lte.connect() # start a data session and obtain an IP address
i = 0
while not lte.isconnected() and i < 30:
machine.idle() # save power while waiting
time.sleep(1.0)
sys.stdout.write(".")
i += 1
print("")
if not lte.isconnected():
raise Exception("unable to connect to LTE network")
print("-- connected ({}s)".format(i))
# print('-- IP address: ' + str(lte.ifconfig()))
def wifi_connect(wlan: WLAN, ssid: str, pwd: str):
print(">> connecting to WLAN network \"{}\"".format(ssid))
try:
wlan.connect(ssid, auth=(WLAN.WPA2, pwd), timeout=10000)
while not wlan.isconnected():
machine.idle() # save power while waiting
except OSError:
raise Exception("unable to connect to WLAN network \"{}\"".format(ssid))
print("-- connected")
print("-- IP address: " + str(wlan.ifconfig()))
def set_time():
rtc = machine.RTC()
i = 0
sys.stdout.write(">> setting time")
rtc.ntp_sync('185.15.72.251', 3600)
while not rtc.synced() and i < 30:
machine.idle() # save power while waiting
time.sleep(1.0)
sys.stdout.write(".")
i += 1
print("")
if not rtc.synced():
raise Exception("unable to set time")
print("-- current time: {}\n".format(rtc.now()))
def _send_at_cmd(lte, cmd) -> []:
result = []
for _ in range(3):
time.sleep(0.2)
print("++ " + cmd)
result = [k for k in lte.send_at_cmd(cmd).split('\r\n') if len(k.strip()) > 0]
print('-- ' + '\r\n-- '.join([r for r in result]))
if result[-1] == 'OK':
print()
break
return result
def set_modem_func_lvl(lte: LTE, func_lvl: int):
"""
Sets modem to the desired level of functionality
Throws exception if operation fails.
:param func_lvl: the functionality level (0: minimum,
1: full,
4: disable modem both transmit and receive RF circuits)
"""
get_func_cmd = "AT+CFUN?"
set_func_cmd = "AT+CFUN={}".format(func_lvl)
print("\n>> setting up modem")
modem_suspended = False
if lte.isconnected():
lte.pppsuspend()
modem_suspended = True
# check if modem is already set to the correct functionality level
result = _send_at_cmd(lte, get_func_cmd)
if result[-1] == 'OK' and result[-2] == '+CFUN: {}'.format(func_lvl):
if modem_suspended:
lte.pppresume()
return
# set modem functionality level
result = _send_at_cmd(lte, set_func_cmd)
if result[-1] == 'OK':
# check if modem is set and ready
result = _send_at_cmd(lte, get_func_cmd)
if result[-1] == 'OK' and result[-2] == '+CFUN: {}'.format(func_lvl):
if modem_suspended:
lte.pppresume()
return
if modem_suspended:
lte.pppresume()
raise Exception("setting up modem failed: {}".format(repr(result)))
def reset_modem(lte: LTE, debug_print=False):
function_level = "1"
cereg_level = "0"
if debug_print: print("\twaiting for reset to finish")
lte.reset()
lte.init()
if debug_print: print("\tsetting function level")
for tries in range(5):
_send_at_cmd(lte, "AT+CFUN=" + function_level)
result = _send_at_cmd(lte, "AT+CFUN?")
if result[0] == '+CFUN: ' + function_level:
break
else:
raise Exception("could not set modem function level")
if debug_print: print("\twaiting for SIM to be responsive")
for tries in range(10):
result = _send_at_cmd(lte, "AT+CIMI")
if result[-1] == 'OK':
break
else:
raise Exception("SIM does not seem to respond after reset")
if debug_print: print("\tdisabling CEREG messages")
# we disable unsolicited CEREG messages, as they interfere with AT communication with the SIM via CSIM commands
# this also requires to use an attach method that does not require cereg messages, for pycom that is legacyattach=false
for tries in range(5):
_send_at_cmd(lte, "AT+CEREG=" + cereg_level)
result = _send_at_cmd(lte, "AT+CEREG?")
if result[0][0:9] == '+CEREG: ' + cereg_level:
break
else:
raise Exception("could not set CEREG level")
def get_imsi(lte: LTE) -> str:
"""
Get the international mobile subscriber identity (IMSI) from SIM
"""
IMSI_LEN = 15
get_imsi_cmd = "AT+CIMI"
print("\n>> getting IMSI")
modem_suspended = False
if lte.isconnected():
lte.pppsuspend()
modem_suspended = True
result = _send_at_cmd(lte, get_imsi_cmd)
if modem_suspended:
lte.pppresume()
if result[-1] == 'OK' and len(result[0]) == IMSI_LEN:
return result[0]
raise Exception("getting IMSI failed: {}".format(repr(result)))
def lte_setup(lte: LTE, connect: bool, apn: str or None):
print(">> initializing LTE")
lte.init()
if connect:
if not lte.isattached():
nb_iot_attach(lte, apn)
if not lte.isconnected():
nb_iot_connect(lte)
def lte_shutdown(lte: LTE, detach=True):
if lte.isconnected():
print(">> disconnecting LTE")
lte.disconnect()
if detach and lte.isattached():
print(">> detaching LTE")
lte.detach()
print(">> de-initializing LTE")
lte.deinit(detach=False, reset=False)
def bootstrap(imsi: str, server: str, auth: str) -> str:
"""
Claim SIM identity at the ubirch backend and return SIM applet PIN to unlock crypto functionality.
Throws exception if bootstrapping fails.
:param imsi: the SIM international mobile subscriber identity (IMSI)
:param server: the bootstrap service URL
:param auth: the ubirch backend password
:param debug: enable debug output
:return: the PIN to authenticate against the SIM card with
"""
url = 'https://' + server + '/ubirch-web-ui/api/v1/devices/bootstrap'
headers = {
'X-Ubirch-IMSI': imsi,
'X-Ubirch-Credential': binascii.b2a_base64(auth).decode().rstrip('\n'),
'X-Ubirch-Auth-Type': 'ubirch'
}
r = requests.get(url, headers=headers)
if r.status_code == 200:
print(">> bootstrapping successful\n")
info = json.loads(r.content)
return info['pin']
else:
raise Exception("request to {} failed with status code {}: {}".format(url, r.status_code, r.text))
def _asn1tosig(data: bytes):
s1 = asn1.asn1_node_root(data)
a1 = asn1.asn1_node_first_child(data, s1)
part1 = asn1.asn1_get_value(data, a1)
a2 = asn1.asn1_node_next(data, a1)
part2 = asn1.asn1_get_value(data, a2)
if len(part1) > 32: part1 = part1[1:]
if len(part2) > 32: part2 = part2[1:]
return part1 + part2
def get_certificate(device_id: str, device_uuid: UUID, proto: SimProtocol) -> bytes:
"""
Get a signed json with the key registration request until CSR handling is in place.
"""
# TODO fix handling of key validity (will be fixed by handling CSR generation through SIM card)
TIME_FMT = '{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.000Z'
now = machine.RTC().now()
created = not_before = TIME_FMT.format(now[0], now[1], now[2], now[3], now[4], now[5])
later = time.localtime(time.mktime(now) + 30758400)
not_after = TIME_FMT.format(later[0], later[1], later[2], later[3], later[4], later[5])
pub_base64 = binascii.b2a_base64(proto.get_key(device_id)).decode()[:-1]
# json must be compact and keys must be sorted alphabetically
REG_TMPL = '{{"algorithm":"ecdsa-p256v1","created":"{}","hwDeviceId":"{}","pubKey":"{}","pubKeyId":"{}","validNotAfter":"{}","validNotBefore":"{}"}}'
REG = REG_TMPL.format(created, str(device_uuid), pub_base64, pub_base64, not_after, not_before).encode()
# get the ASN.1 encoded signature and extract the signature bytes from it
signature = _asn1tosig(proto.sign(device_id, REG, 0x00))
return '{{"pubKeyInfo":{},"signature":"{}"}}'.format(REG.decode(),
binascii.b2a_base64(signature).decode()[:-1]).encode()
def register_key(server: str, auth: str, certificate: bytes) -> bytes:
url = 'https://' + server + '/api/keyService/v1/pubkey'
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(auth)
}
r = requests.post(url=url, headers=headers, data=certificate)
if r.status_code == 200:
print(">> key registration successful\n")
return r.content
else:
raise Exception("request to {} failed with status code {}: {}".format(url, r.status_code, r.text))
def post(server: str, uuid: UUID, auth: str, data: bytes) -> bytes:
url = 'https://' + server + '/'
headers = {
'X-Ubirch-Hardware-Id': str(uuid),
'X-Ubirch-Credential': binascii.b2a_base64(auth).decode().rstrip('\n'),
'X-Ubirch-Auth-Type': 'ubirch'
}
r = requests.post(url=url, data=data, headers=headers)
if r.status_code == 200:
print(">> successfully sent UPP\n")
return r.content
else:
raise Exception("request to {} failed with status code {}: {}".format(url, r.status_code, r.content))
def get_upp_payload(upp: bytes) -> bytes:
"""
Get the payload of a Ubirch Protocol Message
"""
if upp[0] == 0x95 and upp[1] == 0x22: # signed UPP
payload_start_idx = 23
elif upp[0] == 0x96 and upp[1] == 0x23: # chained UPP
payload_start_idx = 89
else:
raise Exception("!! can't get payload from {} (not a UPP)".format(binascii.hexlify(upp).decode()))
payload_len = upp[payload_start_idx - 1]
return upp[payload_start_idx:payload_start_idx + payload_len]