forked from Tribler/py-ipv8
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_dht_endpoint.py
209 lines (169 loc) · 9.78 KB
/
test_dht_endpoint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
from hashlib import sha1
from base64 import b64encode, b64decode
from collections import deque
from twisted.internet.defer import inlineCallbacks
from .rest_peer_communication import HTTPGetRequesterDHT
from ...attestation.trustchain.test_block import TestBlock
from ...mocking.rest.base import RESTTestBase
from ...mocking.rest.rest_peer_communication import string_to_url
from ...mocking.rest.rest_api_peer import RestTestPeer
from ...mocking.rest.comunities import TestDHTCommunity, TestTrustchainCommunity
from ....attestation.trustchain.payload import HalfBlockPayload, DHTBlockPayload
from ....attestation.trustchain.community import TrustChainCommunity
from ....dht.community import DHTCommunity
from ....REST.dht_endpoint import DHTBlockEndpoint
from ....messaging.serialization import Serializer
class TestDHTEndpoint(RESTTestBase):
"""
Class for testing the DHT Endpoint in the REST API of the IPv8 object
"""
def setUp(self):
super(TestDHTEndpoint, self).setUp()
self.initialize([(2, RestTestPeer)], HTTPGetRequesterDHT(), None)
self.serializer = Serializer()
def create_new_peer(self, peer_cls, port, *args, **kwargs):
self._create_new_peer_inner(peer_cls, port, [TestDHTCommunity, TestTrustchainCommunity], *args, **kwargs)
@inlineCallbacks
def publish_to_DHT(self, peer, key, data, numeric_version):
"""
Publish data to the DHT via a peer
:param peer: the peer via which the data is published to the DHT
:param key: the key of the added data
:param data: the data itself; should be a string
:param numeric_version: the version of the data
:return: None
"""
my_private_key = peer.get_keys()['my_peer'].key
for i in range(0, len(data), DHTBlockEndpoint.CHUNK_SIZE):
chunk = data[i: i + DHTBlockEndpoint.CHUNK_SIZE]
signature = my_private_key.signature(str(numeric_version).encode('utf-8') + chunk)
blob_chunk = self.serializer.pack_multiple(DHTBlockPayload(signature, numeric_version, chunk)
.to_pack_list())
yield peer.get_overlay_by_class(DHTCommunity).store_value(key, blob_chunk[0])
def deserialize_payload(self, serializables, data):
"""
Deserialize data
:param serializables: the list of serializable formats
:param data: the serialized data
:return: The payload obtained from deserializing the data
"""
payload = self.serializer.unpack_to_serializables(serializables, data)
return payload[:-1][0]
def _increase_request_limit(self, new_request_limit):
for node in self.nodes:
routing_table = node.get_overlay_by_class(DHTCommunity).routing_table
for other in routing_table.closest_nodes(routing_table.my_node_id):
if other != node:
routing_table.get(other.id).last_queries = deque(maxlen=new_request_limit)
@inlineCallbacks
def test_added_block_explicit(self):
"""
Test the publication of a block which has been added by hand to the DHT
"""
param_dict = {
'port': self.nodes[0].port,
'interface': self.nodes[0].interface,
'endpoint': 'dht/block',
'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin()))
}
# Introduce the nodes
yield self.introduce_nodes(DHTCommunity)
# Manually add a block to the Trustchain
original_block = TestBlock()
hash_key = sha1(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin() +
DHTBlockEndpoint.KEY_SUFFIX).digest()
yield self.publish_to_DHT(self.nodes[0], hash_key, original_block.pack(), 4536)
# Get the block through the REST API
response = yield self._get_style_requests.make_dht_block(param_dict)
self.assertTrue('block' in response and response['block'], "Response is not as expected: %s" % response)
response = b64decode(response['block'])
# Reconstruct the block from what was received in the response
payload = self.deserialize_payload((HalfBlockPayload, ), response)
reconstructed_block = self.nodes[0].get_overlay_by_class(TrustChainCommunity).get_block_class(payload.type) \
.from_payload(payload, self.serializer)
self.assertEqual(reconstructed_block, original_block, "The received block was not the one which was expected")
@inlineCallbacks
def test_added_block_implicit(self):
"""
Test the publication of a block which has been added implicitly to the DHT
"""
param_dict = {
'port': self.nodes[1].port,
'interface': self.nodes[1].interface,
'endpoint': 'dht/block',
'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin()))
}
# Introduce the nodes
yield self.introduce_nodes(DHTCommunity)
publisher_pk = self.nodes[0].get_overlay_by_class(TrustChainCommunity).my_peer.public_key.key_to_bin()
yield self.nodes[0].get_overlay_by_class(TrustChainCommunity).create_source_block(b'test', {})
original_block = self.nodes[0].get_overlay_by_class(TrustChainCommunity).persistence.get(publisher_pk, 1)
yield self.deliver_messages()
yield self.sleep()
# Get the block through the REST API
response = yield self._get_style_requests.make_dht_block(param_dict)
self.assertTrue('block' in response and response['block'], "Response is not as expected: %s" % response)
response = b64decode(response['block'])
# Reconstruct the block from what was received in the response
payload = self.deserialize_payload((HalfBlockPayload, ), response)
reconstructed_block = self.nodes[0].get_overlay_by_class(TrustChainCommunity).get_block_class(payload.type)\
.from_payload(payload, self.serializer)
self.assertEqual(reconstructed_block, original_block, "The received block was not the one which was expected")
@inlineCallbacks
def test_latest_block(self):
"""
Test the retrieval of the latest block via the DHT, when there is
more than one block in the DHT under the same key
"""
param_dict = {
'port': self.nodes[1].port,
'interface': self.nodes[1].interface,
'endpoint': 'dht/block',
'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin()))
}
# Introduce the nodes and increase the size of the request queues
yield self.introduce_nodes(DHTCommunity)
self._increase_request_limit(20)
# Manually add a block to the Trustchain
original_block_1 = TestBlock(transaction={1: 'asd'})
original_block_2 = TestBlock(transaction={1: 'mmm'})
hash_key = sha1(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin() +
DHTBlockEndpoint.KEY_SUFFIX).digest()
# Publish the two blocks under the same key in the first peer
yield self.publish_to_DHT(self.nodes[0], hash_key, original_block_1.pack(), 4536)
yield self.publish_to_DHT(self.nodes[0], hash_key, original_block_2.pack(), 7636)
# Get the block through the REST API from the second peer
response = yield self._get_style_requests.make_dht_block(param_dict)
self.assertTrue('block' in response and response['block'], "Response is not as expected: %s" % response)
response = b64decode(response['block'])
# Reconstruct the block from what was received in the response
payload = self.deserialize_payload((HalfBlockPayload,), response)
reconstructed_block = self.nodes[0].get_overlay_by_class(TrustChainCommunity).get_block_class(
payload.type).from_payload(payload, self.serializer)
self.assertEqual(reconstructed_block, original_block_2, "The received block was not equal to the latest block")
self.assertNotEqual(reconstructed_block, original_block_1, "The received block was equal to the older block")
@inlineCallbacks
def test_block_duplication(self):
"""
Test that a block which has already been pubished in the DHT will not be republished again; i.e. no
duplicate blocks in the DHT under different (embedded) versions.
"""
# Introduce the nodes
yield self.introduce_nodes(DHTCommunity)
# Manually create and add a block to the TrustChain
original_block = TestBlock(key=self.nodes[0].get_keys()['my_peer'].key)
self.nodes[0].get_overlay_by_class(TrustChainCommunity).persistence.add_block(original_block)
# Publish the node to the DHT
hash_key = sha1(self.nodes[0].get_keys()['my_peer'].mid + DHTBlockEndpoint.KEY_SUFFIX).digest()
result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key)
self.assertEqual(result, [], "There shouldn't be any blocks for this key")
yield self.publish_to_DHT(self.nodes[0], hash_key, original_block.pack(), 4536)
result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key)
self.assertNotEqual(result, [], "There should be at least one chunk for this key")
chunk_number = len(result)
# Force call the method which publishes the latest block to the DHT and check that it did not affect the DHT
self.nodes[0].get_overlay_by_class(TrustChainCommunity) \
.notify_listeners(TestBlock(TrustChainCommunity.UNIVERSAL_BLOCK_LISTENER))
# Query the DHT again
result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key)
self.assertEqual(len(result), chunk_number, "The contents of the DHT have been changed. This should not happen")