-
Notifications
You must be signed in to change notification settings - Fork 375
/
test_key.py
126 lines (97 loc) · 4.65 KB
/
test_key.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import logging
from unittest import TestCase
from hvac import exceptions
from tests import utils
from tests.utils.hvac_integration_test_case import HvacIntegrationTestCase
class TestKey(HvacIntegrationTestCase, TestCase):
def test_start_generate_root_with_completion(self):
test_otp = utils.get_generate_root_otp()
self.assertFalse(self.client.sys.read_root_generation_progress()["started"])
start_generate_root_response = self.client.sys.start_root_token_generation(
otp=test_otp,
)
logging.debug("generate_root_response: %s" % start_generate_root_response)
self.assertTrue(self.client.sys.read_root_generation_progress()["started"])
nonce = start_generate_root_response["nonce"]
last_generate_root_response = {}
for key in self.manager.keys[0:3]:
last_generate_root_response = self.client.sys.generate_root(
key=key,
nonce=nonce,
)
logging.debug("last_generate_root_response: %s" % last_generate_root_response)
self.assertFalse(self.client.sys.read_root_generation_progress()["started"])
new_root_token = utils.decode_generated_root_token(
encoded_token=last_generate_root_response["encoded_root_token"],
otp=test_otp,
url=self.client.url,
)
logging.debug("new_root_token: %s" % new_root_token)
token_lookup_resp = self.client.lookup_token(token=new_root_token)
logging.debug("token_lookup_resp: %s" % token_lookup_resp)
# Assert our new root token is properly formed and authenticated
self.client.token = new_root_token
if self.client.is_authenticated():
self.manager.root_token = new_root_token
else:
# If our new token was unable to authenticate, set the test client's token back to the original value
self.client.token = self.manager.root_token
self.fail("Unable to authenticate with the newly generated root token.")
def test_start_generate_root_then_cancel(self):
test_otp = utils.get_generate_root_otp()
self.assertFalse(self.client.sys.read_root_generation_progress()["started"])
self.client.sys.start_root_token_generation(
otp=test_otp,
)
self.assertTrue(self.client.sys.read_root_generation_progress()["started"])
self.client.sys.cancel_root_generation()
self.assertFalse(self.client.sys.read_root_generation_progress()["started"])
def test_rotate(self):
status = self.client.key_status
self.client.sys.rotate_encryption_key()
self.assertGreater(
self.client.key_status["term"],
status["term"],
)
def test_rekey_multi(self):
cls = type(self)
self.assertFalse(self.client.rekey_status["started"])
self.client.sys.start_rekey()
self.assertTrue(self.client.rekey_status["started"])
self.client.sys.cancel_rekey()
self.assertFalse(self.client.rekey_status["started"])
result = self.client.sys.start_rekey()
keys = cls.manager.keys
result = self.client.sys.rekey_multi(keys, nonce=result["nonce"])
self.assertTrue(result["complete"])
cls.manager.keys = result["keys"]
cls.manager.unseal()
def test_rekey_verify_multi(self):
cls = type(self)
# Start rekey process with verification required and use operator keys
self.assertFalse(self.client.sys.read_rekey_progress()["started"])
result = self.client.sys.start_rekey(require_verification=True)
result = self.client.sys.rekey_multi(cls.manager.keys, nonce=result["nonce"])
self.assertTrue(result["complete"])
cls.manager.keys = result["keys"]
# get the initial verification nonce
result = self.client.sys.read_rekey_verify_progress()
first_nonce = result["nonce"]
# now cancel the process and verify we have a new verification nonce
result = self.client.sys.cancel_rekey_verify()
second_nonce = result["nonce"]
self.assertNotEqual(first_nonce, second_nonce)
# finally complete the verification process
result = self.client.sys.rekey_verify_multi(
cls.manager.keys, nonce=result["nonce"]
)
self.assertTrue(result["complete"])
# now we unseal
cls.manager.unseal()
def test_get_backed_up_keys(self):
with self.assertRaises(exceptions.InvalidRequest) as cm:
self.client.sys.read_backup_keys()
self.assertEqual(
first="no backed-up keys found",
second=str(cm.exception),
)