/
test_mlx5_crypto.py
473 lines (424 loc) · 21.2 KB
/
test_mlx5_crypto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
import unittest
import struct
import errno
import json
import os
from pyverbs.providers.mlx5.mlx5dv_mkey import Mlx5Mkey, Mlx5MrInterleaved, \
Mlx5MkeyConfAttr, Mlx5SigCrc, Mlx5SigBlockDomain, Mlx5SigBlockAttr
from pyverbs.providers.mlx5.mlx5dv_crypto import Mlx5CryptoLoginAttr, Mlx5DEK, \
Mlx5DEKInitAttr, Mlx5CryptoAttr, Mlx5CryptoLogin, Mlx5CryptoExtLoginAttr
from pyverbs.providers.mlx5.mlx5dv import Mlx5Context, Mlx5DVContextAttr, \
Mlx5DVQPInitAttr, Mlx5QP
from pyverbs.pyverbs_error import PyverbsRDMAError, PyverbsUserError
from tests.mlx5_base import Mlx5RDMATestCase, Mlx5PyverbsAPITestCase
import pyverbs.providers.mlx5.mlx5_enums as dve
from pyverbs.wr import SGE, SendWR, RecvWR
from pyverbs.qp import QPInitAttrEx, QPCap
from tests.base import RCResources
from pyverbs.pd import PD
import pyverbs.enums as e
import tests.utils as u
DEK_OPAQUE = b'dek'
"""
Crytpo operation requires specific input from the user, e.g. the wrapped credential that the
device is configured with. The input should be provided in JSON format in a file in this path:
"/tmp/mlx5_crypto_test.txt".
User can also set this environment variable with his file path: MLX5_CRYPTO_TEST_INFO
This doc describes the input option with examples to the format:
Mandatory:
Wrapped credential: The credential that was configured in the device wrapped.
Wrapped key: Wrapped key for the DEK creation. The key should be encrypted using the KEK
(Key Encrypted Key).
Optional:
Wrapped 256 bits key: Wrapped key for the DEK creation when the key size of 256 bits is required.
If not provided, that test will only use the key in size of 128 bits.
Encrypted data for 512 of 'c': If a user wants to have data validation, he needs to provide
expected encrypted data for a plain text of 512 bytes of the
character 'c'. If not provided, data validation will be skipped.
Example of content of such file:
[{"credential": [8704278040424473809, 4403447855848063568, 13892768337045135232,
5942481448427925932, 171338997253969038, 5703425261028721211],
"wrapped_key": [contains 5 integers of 64bits each],
"plaintext_key": [contains 8 integers of 32bits each],
"plaintext_256_bits_key": [contains 16 integers of 32bits each],
"encrypted_data_for_512_c": [contains 64 integers of 64bits each],
"wrapped_256_bits_key": [contains 9 integers of 64bits each]}]
"""
def check_crypto_caps(dev_name, is_wrapped_dek_mode, multi_block_support=False):
"""
Check that this device support crypto actions.
:param dev_name: The device name.
:param is_wrapped_dek_mode: True when wrapped_dek and False when plaintext.
:param multi_block_support: If True, check for multi-block support.
"""
mlx5dv_attr = Mlx5DVContextAttr()
ctx = Mlx5Context(mlx5dv_attr, name=dev_name)
crypto_caps = ctx.query_mlx5_device().crypto_caps
failed_selftests = crypto_caps['failed_selftests']
if failed_selftests:
raise unittest.SkipTest(f'The device crypto selftest failed ({failed_selftests})')
single_block_cap = dve.MLX5DV_CRYPTO_ENGINES_CAP_AES_XTS_SINGLE_BLOCK \
& crypto_caps['crypto_engines']
multi_block_cap = dve.MLX5DV_CRYPTO_ENGINES_CAP_AES_XTS_MULTI_BLOCK \
& crypto_caps['crypto_engines']
if not single_block_cap and not multi_block_cap:
raise unittest.SkipTest('The device crypto engines does not support AES')
elif multi_block_support and not multi_block_cap:
raise unittest.SkipTest('The device crypto engines does not support multi blocks')
elif multi_block_cap:
assert single_block_cap, \
'The device crypto engines do not support single block but support multi blocks'
dev_wrapped_import_method = crypto_caps['wrapped_import_method']
if is_wrapped_dek_mode:
if not dve.MLX5DV_CRYPTO_WRAPPED_IMPORT_METHOD_CAP_AES_XTS & dev_wrapped_import_method or \
not dve.MLX5DV_CRYPTO_CAPS_WRAPPED_CRYPTO_OPERATIONAL & crypto_caps['flags']:
raise unittest.SkipTest('The device does not support wrapped DEK')
elif dve.MLX5DV_CRYPTO_WRAPPED_IMPORT_METHOD_CAP_AES_XTS & dev_wrapped_import_method and \
dve.MLX5DV_CRYPTO_CAPS_WRAPPED_CRYPTO_OPERATIONAL & crypto_caps['flags']:
raise unittest.SkipTest('The device does not support plaintext DEK')
def require_crypto_login_details(instance):
"""
Parse the crypto login session details from this file:
'/tmp/mlx5_crypto_test.txt'
If the file doesn't exists or the content is not in Json format, skip the test.
:param instance: The test instance.
"""
crypto_file = '/tmp/mlx5_crypto_test.txt'
if 'MLX5_CRYPTO_TEST_INFO' in os.environ:
crypto_file = os.environ['MLX5_CRYPTO_TEST_INFO']
try:
with open(crypto_file, 'r') as f:
test_details = f.read()
setattr(instance, 'crypto_details', test_details)
json.loads(test_details)
instance.crypto_details = json.loads(test_details)[0]
except json.JSONDecodeError:
raise unittest.SkipTest(f'The crypto data in {crypto_file} must be in Json format')
except FileNotFoundError:
raise unittest.SkipTest(f'Crypto login details must be supplied in {crypto_file}')
def requires_crypto_support(is_wrapped_dek_mode, multi_block_support=False):
"""
:param is_wrapped_dek_mode: True when wapped_dek and False when plaintext.
:param multi_block_support: If True, check for multi-block support.
"""
def outer(func):
def inner(instance):
require_crypto_login_details(instance)
check_crypto_caps(instance.dev_name, is_wrapped_dek_mode, multi_block_support)
return func(instance)
return inner
return outer
class Mlx5CryptoResources(RCResources):
def __init__(self, dev_name, ib_port, gid_index, dv_send_ops_flags=0,
mkey_create_flags=dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_INDIRECT,
msg_size=1024):
self.dv_send_ops_flags = dv_send_ops_flags
self.mkey_create_flags = mkey_create_flags
self.max_inline_data = 512
self.send_ops_flags = e.IBV_QP_EX_WITH_SEND
super().__init__(dev_name, ib_port, gid_index, msg_size=msg_size)
self.create_mkeys()
def create_mkeys(self):
try:
self.wire_enc_mkey = Mlx5Mkey(self.pd, self.mkey_create_flags, max_entries=1)
self.mem_enc_mkey = Mlx5Mkey(self.pd, self.mkey_create_flags, max_entries=1)
except PyverbsRDMAError as ex:
if ex.error_code == errno.EOPNOTSUPP:
raise unittest.SkipTest('Create Mkey is not supported')
raise ex
def create_qp_cap(self):
return QPCap(max_send_wr=self.num_msgs, max_recv_wr=self.num_msgs,
max_inline_data=self.max_inline_data)
def create_qp_init_attr(self):
comp_mask = e.IBV_QP_INIT_ATTR_PD | e.IBV_QP_INIT_ATTR_SEND_OPS_FLAGS
return QPInitAttrEx(cap=self.create_qp_cap(), pd=self.pd, scq=self.cq,
rcq=self.cq, qp_type=e.IBV_QPT_RC,
send_ops_flags=self.send_ops_flags,
comp_mask=comp_mask)
def create_qps(self):
try:
qp_init_attr = self.create_qp_init_attr()
comp_mask = dve.MLX5DV_QP_INIT_ATTR_MASK_SEND_OPS_FLAGS
attr = Mlx5DVQPInitAttr(comp_mask=comp_mask,
send_ops_flags=self.dv_send_ops_flags)
qp = Mlx5QP(self.ctx, qp_init_attr, attr)
self.qps.append(qp)
self.qps_num.append(qp.qp_num)
self.psns.append(0)
except PyverbsRDMAError as ex:
if ex.error_code == errno.EOPNOTSUPP:
raise unittest.SkipTest('Create Mlx5DV QP is not supported')
raise ex
class Mlx5CryptoAPITest(Mlx5PyverbsAPITestCase):
def __init__(self, methodName='runTest'):
super().__init__(methodName)
self.crypto_details = None
def verify_create_dek_out_of_login_session(self):
"""
Verify that create DEK out of crypto login session is not permited.
"""
with self.assertRaises(PyverbsRDMAError) as ex:
Mlx5DEK(self.ctx, self.dek_init_attr)
self.assertEqual(ex.exception.error_code, errno.EINVAL)
def verify_login_state(self, expected_state):
"""
Query the session login state and verify that it's as expected.
"""
state = Mlx5Context.query_login_state(self.ctx)
self.assertEqual(state, expected_state)
def verify_login_twice(self):
"""
Verify that when there is already a login session alive the second login
fails.
"""
with self.assertRaises(PyverbsRDMAError) as ex:
Mlx5Context.crypto_login(self.ctx, self.login_attr)
self.assertEqual(ex.exception.error_code, errno.EEXIST)
def verify_dek_opaque(self):
"""
Query the DEK and verify that its opaque is as expected.
"""
dek_attr = self.dek.query()
self.assertEqual(dek_attr.opaque, DEK_OPAQUE)
@requires_crypto_support(is_wrapped_dek_mode=True)
def test_mlx5_dek_management(self):
"""
Test crypto login and DEK management APIs.
The test checks also that invalid actions are not permited, e.g, create
DEK not in login session on wrapped DEK mode.
"""
try:
self.pd = PD(self.ctx)
cred_bytes = struct.pack('!6Q', *self.crypto_details['credential'])
key = struct.pack('!5Q', *self.crypto_details['wrapped_key'])
self.dek_init_attr = \
Mlx5DEKInitAttr(self.pd, key=key,
key_size=dve.MLX5DV_CRYPTO_KEY_SIZE_128,
key_purpose=dve.MLX5DV_CRYPTO_KEY_PURPOSE_AES_XTS,
opaque=DEK_OPAQUE)
self.verify_create_dek_out_of_login_session()
self.verify_login_state(dve.MLX5DV_CRYPTO_LOGIN_STATE_NO_LOGIN)
# Login to crypto session
self.login_attr = Mlx5CryptoLoginAttr(cred_bytes)
Mlx5Context.crypto_login(self.ctx, self.login_attr)
self.verify_login_state(dve.MLX5DV_CRYPTO_LOGIN_STATE_VALID)
self.verify_login_twice()
self.dek = Mlx5DEK(self.ctx, self.dek_init_attr)
self.verify_dek_opaque()
self.dek.close()
# Logout from crypto session
Mlx5Context.crypto_logout(self.ctx)
self.verify_login_state(dve.MLX5DV_CRYPTO_LOGIN_STATE_NO_LOGIN)
except PyverbsRDMAError as ex:
print(ex)
if ex.error_code == errno.EOPNOTSUPP:
raise unittest.SkipTest('Create crypto elements is not supported')
raise ex
class Mlx5CryptoTrafficTest(Mlx5RDMATestCase):
"""
Test the mlx5 cryto APIs.
"""
def setUp(self):
super().setUp()
self.iters = 10
self.crypto_details = None
self.validate_data = False
self.is_multi_block = False
self.msg_size = 1024
self.key_size = dve.MLX5DV_CRYPTO_KEY_SIZE_128
def create_client_dek(self):
"""
Create DEK using the client resources.
"""
cred_bytes = struct.pack('!6Q', *self.crypto_details['credential'])
log_attr = Mlx5CryptoLoginAttr(cred_bytes)
Mlx5Context.crypto_login(self.client.ctx, log_attr)
self._create_wrapped_dek()
def create_client_wrapped_dek_login_obj(self):
"""
Create CryptoExtLoginAttr and crypto login object
"""
cred_bytes = struct.pack('!6Q', *self.crypto_details['credential'])
log_attr = Mlx5CryptoExtLoginAttr(cred_bytes, 48)
crypto_login_obj = Mlx5CryptoLogin(self.client.ctx, log_attr)
comp_mask=dve.MLX5DV_DEK_INIT_ATTR_CRYPTO_LOGIN
self._create_wrapped_dek(comp_mask, crypto_login_obj)
def _create_wrapped_dek(self, comp_mask=0, crypto_login_obj=None):
"""
Create wrapped DEK using the client resources.
"""
key = struct.pack('!5Q', *self.crypto_details['wrapped_key'])
if self.key_size == dve.MLX5DV_CRYPTO_KEY_SIZE_256:
key = struct.pack('!9Q', *self.crypto_details['wrapped_256_bits_key'])
if comp_mask == dve.MLX5DV_DEK_INIT_ATTR_CRYPTO_LOGIN:
self.dek_attr = Mlx5DEKInitAttr(self.client.pd, key=key,
key_size=self.key_size,
key_purpose=dve.MLX5DV_CRYPTO_KEY_PURPOSE_AES_XTS,
comp_mask=comp_mask, crypto_login=crypto_login_obj)
else:
self.dek_attr = Mlx5DEKInitAttr(self.client.pd, key=key,
key_size=self.key_size,
key_purpose=dve.MLX5DV_CRYPTO_KEY_PURPOSE_AES_XTS)
self.dek = Mlx5DEK(self.client.ctx, self.dek_attr)
def create_client_plaintext_dek(self):
"""
Create DEK using the client resources.
"""
key = struct.pack('8I',*self.crypto_details['plaintext_key'])
if self.key_size == dve.MLX5DV_CRYPTO_KEY_SIZE_256:
key = struct.pack('16I', *self.crypto_details['plaintext_256_bits_key'])
self.dek_attr = Mlx5DEKInitAttr(self.client.pd,key=key,
key_size=self.key_size,
comp_mask=dve.MLX5DV_DEK_INIT_ATTR_CRYPTO_LOGIN,
crypto_login=None)
self.dek = Mlx5DEK(self.client.ctx, self.dek_attr)
def reg_client_mkey(self, signature=False):
"""
Configure an mkey with crypto attributes.
:param signature: True if signature configuration requested.
"""
num_of_configuration = 4 if signature else 3
for mkey in [self.client.wire_enc_mkey, self.client.mem_enc_mkey]:
self.client.qp.wr_start()
self.client.qp.wr_flags = e.IBV_SEND_SIGNALED | e.IBV_SEND_INLINE
offset = 0 if mkey == self.client.wire_enc_mkey else self.client.msg_size/2
sge = SGE(self.client.mr.buf + offset, self.client.msg_size/2, self.client.mr.lkey)
self.client.qp.wr_mkey_configure(mkey, num_of_configuration, Mlx5MkeyConfAttr())
self.client.qp.wr_set_mkey_access_flags(e.IBV_ACCESS_LOCAL_WRITE)
self.client.qp.wr_set_mkey_layout_list([sge])
if signature:
self.configure_mkey_signature()
initial_tweak = struct.pack('!2Q', int(0), int(0))
encrypt_on_tx = mkey == self.client.wire_enc_mkey
sign_crypto_order = dve.MLX5DV_SIGNATURE_CRYPTO_ORDER_SIGNATURE_BEFORE_CRYPTO_ON_TX
crypto_attr = Mlx5CryptoAttr(crypto_standard=dve.MLX5DV_CRYPTO_STANDARD_AES_XTS,
encrypt_on_tx=encrypt_on_tx,
signature_crypto_order=sign_crypto_order,
data_unit_size=dve.MLX5DV_BLOCK_SIZE_512,
dek=self.dek, initial_tweak=initial_tweak)
self.client.qp.wr_set_mkey_crypto(crypto_attr)
self.client.qp.wr_complete()
u.poll_cq(self.client.cq)
def configure_mkey_signature(self):
"""
Configure an mkey with signature attributes.
"""
sig_crc = Mlx5SigCrc(crc_type=dve.MLX5DV_SIG_CRC_TYPE_CRC32, seed=0xFFFFFFFF)
sig_block_domain = Mlx5SigBlockDomain(sig_type=dve.MLX5DV_SIG_TYPE_CRC, crc=sig_crc,
block_size=dve.MLX5DV_BLOCK_SIZE_512)
sig_attr = Mlx5SigBlockAttr(wire=sig_block_domain,
check_mask=dve.MLX5DV_SIG_MASK_CRC32)
self.client.qp.wr_set_mkey_sig_block(sig_attr)
def get_send_wr(self, player, wire_encryption):
mkey = player.wire_enc_mkey if wire_encryption else player.mem_enc_mkey
sge = SGE(0, player.msg_size/2, mkey.lkey)
return SendWR(opcode=e.IBV_WR_SEND, num_sge=1, sg=[sge])
def get_recv_wr(self, player, wire_encryption):
offset = 0 if wire_encryption else player.msg_size/2
sge = SGE(player.mr.buf + offset, player.msg_size/2, player.mr.lkey)
return RecvWR(sg=[sge], num_sge=1)
def prepare_validate_data(self):
data_size = int(self.msg_size / 2)
self.client.mr.write('c' * data_size, data_size)
if self.is_multi_block:
encrypted_data = struct.pack('!128Q', *self.crypto_details['encrypted_data_for_1024_c'])
else:
encrypted_data = struct.pack('!64Q', *self.crypto_details['encrypted_data_for_512_c'])
self.client.mr.write(encrypted_data, data_size, offset=data_size)
def validate_crypto_data(self):
"""
Validate the server MR data. Verify that the encryption/decryption works well.
"""
data_size= int(self.msg_size / 2)
send_msg = self.client.mr.read(self.msg_size, 0)
recv_msg = self.server.mr.read(self.msg_size, 0)
self.assertEqual(send_msg[0:data_size], recv_msg[data_size:self.msg_size])
self.assertEqual(send_msg[data_size:self.msg_size], recv_msg[0:data_size])
def init_data_validation(self):
if 'encrypted_data_for_512_c' in self.crypto_details and not self.is_multi_block:
self.validate_data = True
elif 'encrypted_data_for_1024_c' in self.crypto_details and self.is_multi_block:
self.validate_data = True
def traffic(self):
"""
Perform RC traffic using the configured mkeys.
"""
if self.validate_data:
self.prepare_validate_data()
for _ in range(self.iters):
self.server.qp.post_recv(self.get_recv_wr(self.server, wire_encryption=True))
self.server.qp.post_recv(self.get_recv_wr(self.server, wire_encryption=False))
self.client.qp.post_send(self.get_send_wr(self.client, wire_encryption=True))
self.client.qp.post_send(self.get_send_wr(self.client, wire_encryption=False))
u.poll_cq(self.client.cq, count=2)
u.poll_cq(self.server.cq, count=2)
if self.validate_data:
self.validate_crypto_data()
def run_crypto_dek_test(self, create_dek_func):
"""
Creates player and DEK, using the give function, for different test
cases, and runs traffic.
:param create_dek_func: Function that creates a DEK.
"""
self.init_data_validation()
mkey_flags = dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_CRYPTO | \
dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_INDIRECT
self.create_players(Mlx5CryptoResources,
dv_send_ops_flags=dve.MLX5DV_QP_EX_WITH_MKEY_CONFIGURE,
mkey_create_flags=mkey_flags, msg_size=self.msg_size)
create_dek_func()
self.reg_client_mkey()
self.traffic()
@requires_crypto_support(is_wrapped_dek_mode=True)
def test_mlx5_crypto_mkey_old_api(self):
"""
Create Mkeys, register a memory layout using the mkeys, configure
crypto attributes on it and then run traffic.
"""
self.run_crypto_dek_test(self.create_client_dek)
@requires_crypto_support(is_wrapped_dek_mode=True)
def test_mlx5_crypto_wrapped_dek(self):
"""
Create Mkeys, register a memory layout using the mkeys,
configure crypto attributes on it using login object API and then run
traffic.
Use wrapped DEK with new API.
"""
self.run_crypto_dek_test(self.create_client_wrapped_dek_login_obj)
@requires_crypto_support(is_wrapped_dek_mode=False)
def test_mlx5_crypto_plaintext_dek(self):
"""
Create Mkeys, register a memory layout using the mkeys,
configure crypto attributes on it using login object API and then run
traffic.
Use plaintext DEK.
"""
self.run_crypto_dek_test(self.create_client_plaintext_dek)
@requires_crypto_support(is_wrapped_dek_mode=True)
def test_mlx5_crypto_signature_mkey(self):
"""
Create Mkeys, register a memory layout using this mkey, configure
crypto and signature attributes on it and then perform traffic using
this mkey.
"""
if 'wrapped_256_bits_key' in self.crypto_details:
self.key_size = dve.MLX5DV_CRYPTO_KEY_SIZE_256
mkey_flags = dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_CRYPTO | \
dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_INDIRECT | \
dve.MLX5DV_MKEY_INIT_ATTR_FLAGS_BLOCK_SIGNATURE
self.create_players(Mlx5CryptoResources,
dv_send_ops_flags=dve.MLX5DV_QP_EX_WITH_MKEY_CONFIGURE,
mkey_create_flags=mkey_flags)
self.create_client_dek()
self.reg_client_mkey(signature=True)
self.traffic()
@requires_crypto_support(is_wrapped_dek_mode=False, multi_block_support=True)
def test_mlx5_plaintext_dek_multi_block(self):
self.is_multi_block = True
self.msg_size = 2048
self.run_crypto_dek_test(self.create_client_plaintext_dek)
@requires_crypto_support(is_wrapped_dek_mode=True, multi_block_support=True)
def test_mlx5_wrapped_dek_multi_block(self):
self.is_multi_block = True
self.msg_size = 2048
self.run_crypto_dek_test(self.create_client_wrapped_dek_login_obj)