From e734e215d9d399d5c97d54df0b495dd28f21d57a Mon Sep 17 00:00:00 2001 From: DanGraur Date: Sat, 3 Nov 2018 00:50:32 +0100 Subject: [PATCH 1/3] Added a new endpoint for retrieving the latest TrustChain block via the DHT mechanism. Added a wildcard listener in the listener_map (of the TrustChainCommunity class) whose associated listeners are called regardless of block type. This wildcard is used to publish newly added to the TrustChainDB blocks to the DHT. The callback for this mechanism also defines behavior which disallows the same block from being published twice to the DHT. It is now possible to retrieve the latest TrustChain block of some peer via a HTTP request. Created a new test set for the newly added block retrieval mechanism. In addition to this, made a few changes to the REST API test suite in terms of the communication modules, such as moving classes from one module to another and creating new modules which host logic for HTTP requests and communication for specific endpoints. --- ipv8/REST/dht_endpoint.py | 131 +++++- ipv8/attestation/trustchain/community.py | 8 +- .../test/REST/attestationendpoint/__init__.py | 0 .../peer_communication.py | 7 - .../rest_peer_communication.py | 390 ++++++++++++++++++ .../test_attestation_endpoint.py | 14 +- ipv8/test/REST/dht/__init__.py | 0 ipv8/test/REST/dht/rest_peer_communication.py | 43 ++ ipv8/test/REST/dht/test_dht_endpoint.py | 194 +++++++++ ipv8/test/mocking/rest/comunities.py | 27 ++ ipv8/test/mocking/rest/ipv8.py | 6 +- .../mocking/rest/rest_peer_communication.py | 365 +--------------- test_classes_list.txt | 4 +- 13 files changed, 805 insertions(+), 384 deletions(-) create mode 100644 ipv8/test/REST/attestationendpoint/__init__.py rename ipv8/test/{mocking/rest => REST/attestationendpoint}/peer_communication.py (96%) create mode 100644 ipv8/test/REST/attestationendpoint/rest_peer_communication.py rename ipv8/test/REST/{ => attestationendpoint}/test_attestation_endpoint.py (98%) create mode 100644 ipv8/test/REST/dht/__init__.py create mode 100644 ipv8/test/REST/dht/rest_peer_communication.py create mode 100644 ipv8/test/REST/dht/test_dht_endpoint.py diff --git a/ipv8/REST/dht_endpoint.py b/ipv8/REST/dht_endpoint.py index 4a825ab7f..27877d2dd 100644 --- a/ipv8/REST/dht_endpoint.py +++ b/ipv8/REST/dht_endpoint.py @@ -1,14 +1,19 @@ from __future__ import absolute_import -from base64 import b64encode +from base64 import b64encode, b64decode from binascii import hexlify, unhexlify +from hashlib import sha1 import json +import struct +from twisted.internet.defer import inlineCallbacks, returnValue from twisted.web import http, resource from twisted.web.server import NOT_DONE_YET -from ..dht.community import DHTCommunity +from ..dht.community import DHTCommunity, MAX_ENTRY_SIZE +from ..attestation.trustchain.community import TrustChainCommunity from ..dht.discovery import DHTDiscoveryCommunity +from ..attestation.trustchain.listener import BlockListener class DHTEndpoint(resource.Resource): @@ -20,10 +25,126 @@ def __init__(self, session): resource.Resource.__init__(self) dht_overlays = [overlay for overlay in session.overlays if isinstance(overlay, DHTCommunity)] + tc_overlays = [overlay for overlay in session.overlays if isinstance(overlay, TrustChainCommunity)] if dht_overlays: - self.putChild("statistics", DHTStatisticsEndpoint(dht_overlays[0])) - self.putChild("values", DHTValuesEndpoint(dht_overlays[0])) - self.putChild("peers", DHTPeersEndpoint(dht_overlays[0])) + self.putChild(b"statistics", DHTStatisticsEndpoint(dht_overlays[0])) + self.putChild(b"values", DHTValuesEndpoint(dht_overlays[0])) + self.putChild(b"peers", DHTPeersEndpoint(dht_overlays[0])) + self.putChild(b"block", DHTBlockEndpoint(dht_overlays[0], tc_overlays[0])) + + +class DHTBlockEndpoint(resource.Resource, BlockListener): + """ + This endpoint is responsible for returning the latest Trustchain block of a peer. Additionally, it ensures + this peer's latest TC block is available + """ + + def received_block(self, block): + """ + Wrapper callback method, inherited from the BlockListener abstract class, which will publish the latest + TrustChain block to the DHT + + :param block: the latest block added to the Database. This is not actually used by the inner method + :return: None + """ + self.publish_latest_block() + + def should_sign(self, block): + pass + + KEY_SUFFIX = b'_BLOCK' + + def __init__(self, dht, trustchain): + resource.Resource.__init__(self) + self.dht = dht + self.trustchain = trustchain + self.block_version = 0 + + self._hashed_dht_key = sha1(self.trustchain.my_peer.mid + self.KEY_SUFFIX).digest() + + trustchain.add_listener(self, [trustchain.UNIVERSAL_BLOCK_LISTENER]) + + def reconstruct_all_blocks(self, block_chunks): + """ + Given a list of block chunks, reconstruct all the blocks in a dictionary indexed by their version + + :param block_chunks: the list of block chunks + :return: a dictionary of reconstructed blocks (in packed format), indexed by the version of the blocks, + and the maximal version + """ + new_blocks = {} + max_version = 0 + + for entry in block_chunks: + this_version = struct.unpack("H", entry[1:3])[0] + max_version = max_version if max_version > this_version else this_version + + new_blocks[this_version] = entry[3:] + new_blocks[this_version] if this_version in new_blocks else entry[3:] + + return new_blocks, max_version + + def _is_duplicate(self, latest_block): + """ + Checks to see if this block has already been published to the DHT + + :param latest_block: the PACKED version of the latest block + :return: True if the block has indeed been published before, False otherwise + """ + block_chunks = self.dht.storage.get(self._hashed_dht_key) + new_blocks, _ = self.reconstruct_all_blocks(block_chunks) + + for val in new_blocks.values(): + if val == latest_block: + return True + + return False + + @inlineCallbacks + def publish_latest_block(self): + """ + Publish the latest block of this node's TrustChain to the DHT + """ + latest_block = self.trustchain.persistence.get_latest(self.trustchain.my_peer.public_key.key_to_bin()) + + if latest_block: + # Get all the previously published blocks for this peer from the DHT, and check if this is a duplicate + latest_block = latest_block.pack() + if self._is_duplicate(latest_block): + returnValue(None) + + version = struct.pack("H", self.block_version) + self.block_version += 1 + + for i in range(0, len(latest_block), MAX_ENTRY_SIZE - 3): + blob_chunk = version + latest_block[i:i + MAX_ENTRY_SIZE - 3] + yield self.dht.store_value(self._hashed_dht_key, blob_chunk) + + def render_GET(self, request): + """ + Return the latest TC block of a peer, as identified in the request + + :param request: the request for retrieving the latest TC block of a peer. It must contain the peer's + public key of the peer + :return: the latest block of the peer, if found + """ + if not self.dht: + request.setResponseCode(http.NOT_FOUND) + return json.dumps({"error": "DHT community not found"}).encode('utf-8') + + if not request.args or b'public_key' not in request.args: + request.setResponseCode(http.BAD_REQUEST) + return json.dumps({"error": "Must specify the peer's public key"}).encode('utf-8') + + hash_key = sha1(b64decode(request.args[b'public_key'][0]) + self.KEY_SUFFIX).digest() + block_chunks = self.dht.storage.get(hash_key) + + if not block_chunks: + request.setResponseCode(http.NOT_FOUND) + return json.dumps({"error": "Could not find a block for the specified key."}).encode('utf-8') + + new_blocks, max_version = self.reconstruct_all_blocks(block_chunks) + + return json.dumps({"block": b64encode(new_blocks[max_version]).decode('utf-8')}).encode('utf-8') class DHTStatisticsEndpoint(resource.Resource): diff --git a/ipv8/attestation/trustchain/community.py b/ipv8/attestation/trustchain/community.py index 0d480001a..0b00202b9 100644 --- a/ipv8/attestation/trustchain/community.py +++ b/ipv8/attestation/trustchain/community.py @@ -48,6 +48,7 @@ class TrustChainCommunity(Community): master_peer = Peer(unhexlify("4c69624e61434c504b3a5730f52156615ecbcedb36c442992ea8d3c26b418edd8bd00e01dce26028cd" "1ebe5f7dce59f4ed59f8fcee268fd7f1c6dc2fa2af8c22e3170e00cdecca487745")) + UNIVERSAL_BLOCK_LISTENER = 'UNIVERSAL_BLOCK_LISTENER' DB_CLASS = TrustChainDB DB_NAME = 'trustchain' version = b'\x02' @@ -349,7 +350,12 @@ def notify_listeners(self, block): """ Notify listeners of a specific new block. """ - if block.type not in self.listeners_map or self.shutting_down: + # Call the listeners associated to the universal block, if there are any + for listener in self.listeners_map.get(self.UNIVERSAL_BLOCK_LISTENER, []): + listener.received_block(block) + + # Avoid proceeding any further if the type of the block coincides with the UNIVERSAL_BLOCK_LISTENER + if block.type not in self.listeners_map or self.shutting_down or block.type == self.UNIVERSAL_BLOCK_LISTENER: return for listener in self.listeners_map[block.type]: diff --git a/ipv8/test/REST/attestationendpoint/__init__.py b/ipv8/test/REST/attestationendpoint/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ipv8/test/mocking/rest/peer_communication.py b/ipv8/test/REST/attestationendpoint/peer_communication.py similarity index 96% rename from ipv8/test/mocking/rest/peer_communication.py rename to ipv8/test/REST/attestationendpoint/peer_communication.py index 5b7cbbd54..6a059c92f 100644 --- a/ipv8/test/mocking/rest/peer_communication.py +++ b/ipv8/test/REST/attestationendpoint/peer_communication.py @@ -1,4 +1,3 @@ - from abc import abstractmethod @@ -111,9 +110,3 @@ def make_outstanding_verify(self, param_dict): :return: None """ pass - - -class RequestException(Exception): - """ - Custom exception used to model request errors - """ diff --git a/ipv8/test/REST/attestationendpoint/rest_peer_communication.py b/ipv8/test/REST/attestationendpoint/rest_peer_communication.py new file mode 100644 index 000000000..c54aa7bb5 --- /dev/null +++ b/ipv8/test/REST/attestationendpoint/rest_peer_communication.py @@ -0,0 +1,390 @@ +from twisted.internet.defer import inlineCallbacks, returnValue + +from .peer_communication import IGetStyleRequestsAE, IPostStyleRequestsAE +from ...mocking.rest.rest_peer_communication import HTTPRequester, RequestException, process_json_response + + +class HTTPGetRequesterAE(IGetStyleRequestsAE, HTTPRequester): + """ + Implements the GetStyleRequests abstract methods using the HTTP protocol for the attestation endpoint. + """ + + def __init__(self): + IGetStyleRequestsAE.__init__(self) + HTTPRequester.__init__(self) + + @process_json_response + @inlineCallbacks + def make_outstanding(self, param_dict): + """ + Forward a request for outstanding attestation requests. + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + (optional) 'callback': single parameter callback for the request's response + } + :return: the request's response + :raises RequestException: raised when the method could not find some element required for the construction of + the request + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + + response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), + 'GET', + {'type': 'outstanding'}, + param_dict.get('callback', None)) + returnValue(response) + + @process_json_response + @inlineCallbacks + def make_verification_output(self, param_dict): + """ + Forward a request for the verification outputs. + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + (optional) 'callback': single parameter callback for the request's response + } + :return: the request's response + :raises RequestException: raised when the method could not find some element required for the construction of + the request + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + + response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), + 'GET', + {'type': 'verification_output'}, + param_dict.get('callback', None)) + returnValue(response) + + @process_json_response + @inlineCallbacks + def make_peers(self, param_dict): + """ + Forward a request for the known peers in the network. + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + (optional) 'callback': single parameter callback for the request's response + } + :return: the request's response + :raises RequestException: raised when the method could not find some element required for the construction of + the request + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + + response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), + 'GET', + {'type': 'peers'}, + param_dict.get('callback', None)) + returnValue(response) + + @process_json_response + @inlineCallbacks + def make_attributes(self, param_dict): + """ + Forward a request for the attributes of a peer. + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + (optional) 'callback': single parameter callback for the request's response + } + :return: the request's response + :raises RequestException: raised when the method could not find some element required for the construction of + the request + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + + # Add the type of the request (attributes), and the (optional) b64_mid of the attester + request_parameters = param_dict.get('request_parameters', dict()) + request_parameters.update({'type': 'attributes'}) + + response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), + 'GET', + request_parameters, + param_dict.get('callback', None)) + returnValue(response) + + @process_json_response + @inlineCallbacks + def make_dht_block(self, param_dict): + """ + Forward a request for the latest TC block of a peer + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + 'public_key': the public key of the peer whose latest TC block is being requested + (optional) 'callback': single parameter callback for the request's response + } + :return: None + :raises RequestException: raised when the method could not find one of the required pieces of information + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + request_parameters = {} + + if 'public_key' in param_dict: + request_parameters['public_key'] = param_dict['public_key'] + else: + raise RequestException("Malformed request: did not specify the public_key") + + response = yield self.make_request("http://{0}:{1}/{2}".format(interface, port, endpoint), + 'GET', + request_parameters, + param_dict.get('callback', None)) + returnValue(response) + + @inlineCallbacks + def make_drop_identity(self, param_dict): + """ + Forward a request for dropping a peer's identity. + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + (optional) 'callback': single parameter callback for the request's response + } + :return: the request's response + :raises RequestException: raised when the method could not find some element required for the construction of + the request + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + + response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), + 'GET', + {'type': 'drop_identity'}, + param_dict.get('callback', None)) + returnValue(response) + + @process_json_response + @inlineCallbacks + def make_outstanding_verify(self, param_dict): + """ + Forward a request which requests information on the outstanding verify requests + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + (optional) 'callback': single parameter callback for the request's response + } + :return: the request's response + :raises RequestException: raised when the method could not find some element required for the construction of + the request + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + + # Add the type of the request (request), and the rest of the parameters + request_parameters = {'type': 'outstanding_verify'} + + response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), + 'GET', + request_parameters, + param_dict.get('callback', None)) + returnValue(response) + + +class HTTPPostRequesterAE(IPostStyleRequestsAE, HTTPRequester): + """ + Implements the PostStyleRequests abstract methods using the HTTP protocol for the AttestationEndpoint + """ + + def __init__(self): + IPostStyleRequestsAE.__init__(self) + HTTPRequester.__init__(self) + + @inlineCallbacks + def make_attestation_request(self, param_dict): + """ + Forward a request for the attestation of an attribute. + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + 'attribute_name': attribute_name + 'mid': attester b64_mid + (optional) 'metadata': JSON style metadata required for the attestation process + (optional) 'callback': single parameter callback for the request's response + } + :return: the request's response + :raises RequestException: raised when the method could not find some element required for the construction of + the request + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + + # Add the type of the request (request), and the rest of the parameters + request_parameters = {'type': 'request'} + + # Add the request parameters one-by-one; if required parameter is missing, then raise error + if 'attribute_name' in param_dict: + request_parameters['attribute_name'] = param_dict['attribute_name'] + else: + raise RequestException("Malformed request: did not specify the attribute_name") + + if 'mid' in param_dict: + request_parameters['mid'] = param_dict['mid'] + else: + raise RequestException("Malformed request: did not specify the attester's mid") + + if 'metadata' in param_dict: + request_parameters['metadata'] = param_dict['metadata'] + + response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), + 'POST', + request_parameters, + param_dict.get('callback', None)) + returnValue(response) + + @inlineCallbacks + def make_attest(self, param_dict): + """ + Forward a request which attests an attestation request. + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + 'attribute_name': attribute_name + 'mid': attestee's b64_mid + 'attribute_value': b64 hash of the attestation blob + (optional) 'callback': single parameter callback for the request's response + } + :return: the request's response + :raises RequestException: raised when the method could not find some element required for the construction of + the request + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + + # Add the type of the request (attest), and the rest of the parameters + request_parameters = {'type': 'attest'} + + # Add the request parameters one-by-one; if required parameter is missing, then raise error + if 'attribute_name' in param_dict: + request_parameters['attribute_name'] = param_dict['attribute_name'] + else: + raise RequestException("Malformed request: did not specify the attribute_name") + + if 'mid' in param_dict: + request_parameters['mid'] = param_dict['mid'] + else: + raise RequestException("Malformed request: did not specify the attestee's mid") + + if 'attribute_value' in param_dict: + request_parameters['attribute_value'] = param_dict['attribute_value'] + else: + raise RequestException("Malformed request: did not specify the attribute_value, i.e. the attestation" + "blob hash") + + response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), + 'POST', + request_parameters, + param_dict.get('callback', None)) + returnValue(response) + + @inlineCallbacks + def make_verify(self, param_dict): + """ + Forward a request which demands the verification of an attestation + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + 'attribute_hash': the b64 hash of the attestation blob which needs to be verified + 'mid': verifier's b64_mid + 'attribute_values': a string of b64 encoded values, which are separated by ',' characters + e.g. "val_1,val_2,val_3, ..., val_N" + (optional) 'callback': single parameter callback for the request's response + } + :return: the request's response + :raises RequestException: raised when the method could not find some element required for the construction of + the request + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + + # Add the type of the request (attest), and the rest of the parameters + request_parameters = {'type': 'verify'} + + # Add the request parameters one-by-one; if required parameter is missing, then raise error + if 'attribute_hash' in param_dict: + request_parameters['attribute_hash'] = param_dict['attribute_hash'] + else: + raise RequestException("Malformed request: did not specify the attribute_hash") + + if 'mid' in param_dict: + request_parameters['mid'] = param_dict['mid'] + else: + raise RequestException("Malformed request: did not specify the verifier's mid") + + if 'attribute_values' in param_dict: + request_parameters['attribute_values'] = param_dict['attribute_values'] + else: + raise RequestException("Malformed request: did not specify the attribute_values") + + response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), + 'POST', + request_parameters, + param_dict.get('callback', None)) + + returnValue(response) + + @inlineCallbacks + def make_allow_verify(self, param_dict): + """ + Forward a request which requests that verifications be allowed for a particular peer for a particular attribute + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + 'attribute_name': attribute_name + 'mid': verifier's b64_mid + (optional) 'callback': single parameter callback for the request's response + } + :return: the request's response + :raises RequestException: raised when the method could not find some element required for the construction of + the request + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + + # Add the type of the request (request), and the rest of the parameters + request_parameters = {'type': 'allow_verify'} + + # Add the request parameters one-by-one; if required parameter is missing, then raise error + if 'attribute_name' in param_dict: + request_parameters['attribute_name'] = param_dict['attribute_name'] + else: + raise RequestException("Malformed request: did not specify the attribute_name") + + if 'mid' in param_dict: + request_parameters['mid'] = param_dict['mid'] + else: + raise RequestException("Malformed request: did not specify the attester's mid") + + response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), + 'POST', + request_parameters, + param_dict.get('callback', None)) + returnValue(response) diff --git a/ipv8/test/REST/test_attestation_endpoint.py b/ipv8/test/REST/attestationendpoint/test_attestation_endpoint.py similarity index 98% rename from ipv8/test/REST/test_attestation_endpoint.py rename to ipv8/test/REST/attestationendpoint/test_attestation_endpoint.py index 2a6fd6404..c3b91f2a4 100644 --- a/ipv8/test/REST/test_attestation_endpoint.py +++ b/ipv8/test/REST/attestationendpoint/test_attestation_endpoint.py @@ -3,13 +3,13 @@ from json import dumps from twisted.internet.defer import returnValue, inlineCallbacks -from ..mocking.rest.base import RESTTestBase -from ..mocking.rest.peer_interactive_behavior import RequesterRestTestPeer -from ..mocking.rest.rest_peer_communication import string_to_url -from ..mocking.rest.rest_api_peer import RestTestPeer -from ..mocking.rest.rest_peer_communication import HTTPGetRequesterAE, HTTPPostRequesterAE -from ..mocking.rest.comunities import TestAttestationCommunity, TestIdentityCommunity -from ...attestation.identity.community import IdentityCommunity +from .rest_peer_communication import HTTPGetRequesterAE, HTTPPostRequesterAE +from ...mocking.rest.base import RESTTestBase +from ...mocking.rest.peer_interactive_behavior import RequesterRestTestPeer +from ...mocking.rest.rest_peer_communication import string_to_url +from ...mocking.rest.rest_api_peer import RestTestPeer +from ...mocking.rest.comunities import TestAttestationCommunity, TestIdentityCommunity +from ....attestation.identity.community import IdentityCommunity class TestAttestationEndpoint(RESTTestBase): diff --git a/ipv8/test/REST/dht/__init__.py b/ipv8/test/REST/dht/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ipv8/test/REST/dht/rest_peer_communication.py b/ipv8/test/REST/dht/rest_peer_communication.py new file mode 100644 index 000000000..22225c515 --- /dev/null +++ b/ipv8/test/REST/dht/rest_peer_communication.py @@ -0,0 +1,43 @@ +from twisted.internet.defer import inlineCallbacks, returnValue + +from ...mocking.rest.rest_peer_communication import HTTPRequester, RequestException, process_json_response + + +class HTTPGetRequesterDHT(HTTPRequester): + """ + Implements the HTTP GET type requests for the DHT endpoint + """ + + def __init__(self): + HTTPRequester.__init__(self) + + @process_json_response + @inlineCallbacks + def make_dht_block(self, param_dict): + """ + Forward a request for the latest TC block of a peer + + :param param_dict: Should have at least the following structure: + { + 'interface': target peer IP or alias + 'port': port_number + 'endpoint': endpoint_name + 'public_key': the public key of the peer whose latest TC block is being requested + (optional) 'callback': single parameter callback for the request's response + } + :return: None + :raises RequestException: raised when the method could not find one of the required pieces of information + """ + interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) + request_parameters = {} + + if 'public_key' in param_dict: + request_parameters['public_key'] = param_dict['public_key'] + else: + raise RequestException("Malformed request: did not specify the public_key") + + response = yield self.make_request("http://{0}:{1}/{2}".format(interface, port, endpoint), + 'GET', + request_parameters, + param_dict.get('callback', None)) + returnValue(response) diff --git a/ipv8/test/REST/dht/test_dht_endpoint.py b/ipv8/test/REST/dht/test_dht_endpoint.py new file mode 100644 index 000000000..a3a131b51 --- /dev/null +++ b/ipv8/test/REST/dht/test_dht_endpoint.py @@ -0,0 +1,194 @@ +import struct + +from hashlib import sha1 +from base64 import b64encode, b64decode +from twisted.internet.defer import inlineCallbacks + +from .rest_peer_communication import HTTPGetRequesterDHT +from ...attestation.trustchain.test_block import TestBlock +from ...mocking.rest.base import RESTTestBase +from ...mocking.rest.rest_peer_communication import string_to_url +from ...mocking.rest.rest_api_peer import RestTestPeer +from ...mocking.rest.comunities import TestDHTCommunity, TestTrustchainCommunity +from ....attestation.trustchain.payload import HalfBlockPayload +from ....attestation.trustchain.community import TrustChainCommunity +from ....dht.community import DHTCommunity, MAX_ENTRY_SIZE +from ....REST.dht_endpoint import DHTBlockEndpoint +from ....messaging.serialization import Serializer + + +class TestDHTEndpoint(RESTTestBase): + """ + Class for testing the DHT Endpoint in the REST API of the IPv8 object + """ + + def setUp(self): + super(TestDHTEndpoint, self).setUp() + self.initialize([(2, RestTestPeer)], HTTPGetRequesterDHT(), None) + self.serializer = Serializer() + + def create_new_peer(self, peer_cls, port, *args, **kwargs): + self._create_new_peer_inner(peer_cls, port, [TestDHTCommunity, TestTrustchainCommunity], *args, **kwargs) + + @inlineCallbacks + def publish_to_DHT(self, peer, key, data, numeric_version): + """ + Publish data to the DHT via a peer + + :param peer: the peer via which the data is published to the DHT + :param key: the key of the added data + :param data: the data itself; should be a string + :param numeric_version: the version of the data + :return: None + """ + version = struct.pack("H", numeric_version) + + for i in range(0, len(data), MAX_ENTRY_SIZE - 3): + blob_chunk = version + data[i:i + MAX_ENTRY_SIZE - 3] + yield peer.get_overlay_by_class(DHTCommunity).store_value(key, blob_chunk) + + def deserialize_payload(self, serializables, data): + """ + Deserialize data + + :param serializables: the list of serializable formats + :param data: the serialized data + :return: The payload obtained from deserializing the data + """ + payload = self.serializer.unpack_to_serializables(serializables, data) + return payload[:-1][0] + + @inlineCallbacks + def test_added_block_explicit(self): + """ + Test the publication of a block which has been added by hand to the DHT + """ + param_dict = { + 'port': self.nodes[0].port, + 'interface': self.nodes[0].interface, + 'endpoint': 'dht/block', + 'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].mid)) + } + # Introduce the nodes + yield self.introduce_nodes(DHTCommunity) + + # Manually add a block to the Trustchain + original_block = TestBlock() + hash_key = sha1(self.nodes[0].get_keys()['my_peer'].mid + DHTBlockEndpoint.KEY_SUFFIX).digest() + + yield self.publish_to_DHT(self.nodes[0], hash_key, original_block.pack(), 4536) + + # Get the block through the REST API + response = yield self._get_style_requests.make_dht_block(param_dict) + self.assertTrue('block' in response and response['block'], "Response is not as expected: %s" % response) + response = b64decode(response['block']) + + # Reconstruct the block from what was received in the response + payload = self.deserialize_payload((HalfBlockPayload, ), response) + reconstructed_block = self.nodes[0].get_overlay_by_class(TrustChainCommunity).get_block_class(payload.type) \ + .from_payload(payload, self.serializer) + + self.assertEqual(reconstructed_block, original_block, "The received block was not the one which was expected") + + @inlineCallbacks + def test_added_block_implicit(self): + """ + Test the publication of a block which has been added implicitly to the DHT + """ + param_dict = { + 'port': self.nodes[1].port, + 'interface': self.nodes[1].interface, + 'endpoint': 'dht/block', + 'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].mid)) + } + # Introduce the nodes + yield self.introduce_nodes(DHTCommunity) + + publisher_pk = self.nodes[0].get_overlay_by_class(TrustChainCommunity).my_peer.public_key.key_to_bin() + + yield self.nodes[0].get_overlay_by_class(TrustChainCommunity).create_source_block(b'test', {}) + original_block = self.nodes[0].get_overlay_by_class(TrustChainCommunity).persistence.get(publisher_pk, 1) + yield self.deliver_messages() + + # Get the block through the REST API + response = yield self._get_style_requests.make_dht_block(param_dict) + self.assertTrue('block' in response and response['block'], "Response is not as expected: %s" % response) + response = b64decode(response['block']) + + # Reconstruct the block from what was received in the response + payload = self.deserialize_payload((HalfBlockPayload,), response) + reconstructed_block = self.nodes[0].get_overlay_by_class(TrustChainCommunity).get_block_class(payload.type)\ + .from_payload(payload, self.serializer) + + self.assertEqual(reconstructed_block, original_block, "The received block was not the one which was expected") + + @inlineCallbacks + def test_latest_block(self): + """ + Test the retrieval of the latest block via the DHT, when there is + more than one block in the DHT under the same key + """ + param_dict = { + 'port': self.nodes[1].port, + 'interface': self.nodes[1].interface, + 'endpoint': 'dht/block', + 'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].mid)) + } + # Introduce the nodes + yield self.introduce_nodes(DHTCommunity) + + # Manually add a block to the Trustchain + original_block_1 = TestBlock(transaction={1: 'asd'}) + original_block_2 = TestBlock(transaction={1: 'mmm'}) + hash_key = sha1(self.nodes[0].get_keys()['my_peer'].mid + DHTBlockEndpoint.KEY_SUFFIX).digest() + + # Publish the two blocks under the same key in the first peer + yield self.publish_to_DHT(self.nodes[0], hash_key, original_block_1.pack(), 4536) + yield self.publish_to_DHT(self.nodes[0], hash_key, original_block_2.pack(), 7636) + + # Get the block through the REST API from the second peer + response = yield self._get_style_requests.make_dht_block(param_dict) + self.assertTrue('block' in response and response['block'], "Response is not as expected: %s" % response) + response = b64decode(response['block']) + + # Reconstruct the block from what was received in the response + payload = self.deserialize_payload((HalfBlockPayload,), response) + reconstructed_block = self.nodes[0].get_overlay_by_class(TrustChainCommunity).get_block_class( + payload.type).from_payload(payload, self.serializer) + + self.assertEqual(reconstructed_block, original_block_2, "The received block was not equal to the latest block") + self.assertNotEqual(reconstructed_block, original_block_1, "The received block was equal to the older block") + + @inlineCallbacks + def test_block_duplication(self): + """ + Test that a block which has already been pubished in the DHT will not be republished again; i.e. no + duplicate blocks in the DHT under different (embedded) versions. + """ + # Introduce the nodes + yield self.introduce_nodes(DHTCommunity) + + # Manually create and add a block to the TrustChain + original_block = TestBlock(key=self.nodes[0].get_keys()['my_peer'].key) + self.nodes[0].get_overlay_by_class(TrustChainCommunity).persistence.add_block(original_block) + + # Publish the node to the DHT + hash_key = sha1(self.nodes[0].get_keys()['my_peer'].mid + DHTBlockEndpoint.KEY_SUFFIX).digest() + + result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key) + self.assertEqual(result, [], "There shouldn't be any blocks for this key") + + yield self.publish_to_DHT(self.nodes[0], hash_key, original_block.pack(), 4536) + + result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key) + self.assertNotEqual(result, [], "There should be at least one chunk for this key") + + chunk_number = len(result) + + # Force call the method which publishes the latest block to the DHT and check that it did not affect the DHT + self.nodes[0].get_overlay_by_class(TrustChainCommunity) \ + .notify_listeners(TestBlock(TrustChainCommunity.UNIVERSAL_BLOCK_LISTENER)) + + # Query the DHT again + result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key) + self.assertEqual(len(result), chunk_number, "The contents of the DHT have been changed. This should not happen") diff --git a/ipv8/test/mocking/rest/comunities.py b/ipv8/test/mocking/rest/comunities.py index 89ed1b711..812d1cbf2 100644 --- a/ipv8/test/mocking/rest/comunities.py +++ b/ipv8/test/mocking/rest/comunities.py @@ -1,5 +1,7 @@ from ....attestation.identity.community import IdentityCommunity from ....attestation.wallet.community import AttestationCommunity +from ....attestation.trustchain.community import TrustChainCommunity +from ....dht.community import DHTCommunity from ....keyvault.crypto import ECCrypto from ....peer import Peer @@ -10,3 +12,28 @@ class TestAttestationCommunity(AttestationCommunity): class TestIdentityCommunity(IdentityCommunity): master_peer = Peer(ECCrypto().generate_key(u'high')) + + +class TestDHTCommunity(DHTCommunity): + master_peer = Peer(ECCrypto().generate_key(u'high')) + + +class TestTrustchainCommunity(TrustChainCommunity): + master_peer = Peer(ECCrypto().generate_key(u'high')) + + +def overlay_initializer(overlay_class, my_peer, endpoint, network, working_directory): + """ + Wrapper class, which instantiates new overlay classes. + + :param overlay_class: the overlay's class + :param my_peer: the peer passed to the overlay + :param endpoint: the endpoint passed to the overlay + :param network: the network passer to the overlay + :param working_directory: the overlay's working directory + :return: an initialized object of the overlay_class type + """ + if issubclass(overlay_class, DHTCommunity): + return overlay_class(my_peer, endpoint, network) + + return overlay_class(my_peer, endpoint, network, working_directory=working_directory) diff --git a/ipv8/test/mocking/rest/ipv8.py b/ipv8/test/mocking/rest/ipv8.py index 90cf48e94..f1eb9a87a 100644 --- a/ipv8/test/mocking/rest/ipv8.py +++ b/ipv8/test/mocking/rest/ipv8.py @@ -2,7 +2,7 @@ from twisted.internet.task import LoopingCall -from .comunities import TestIdentityCommunity, TestAttestationCommunity +from .comunities import TestIdentityCommunity, TestAttestationCommunity, overlay_initializer from ....keyvault.crypto import ECCrypto from ....messaging.interfaces.udp.endpoint import UDPEndpoint from ....peer import Peer @@ -26,8 +26,8 @@ def __init__(self, crypto_curve, port, interface, overlay_classes, memory_dbs=Tr self.overlays = [] for overlay_class in overlay_classes: - self.overlays.append(overlay_class(my_peer, self.endpoint, self.network, - working_directory=database_working_dir)) + self.overlays.append(overlay_initializer(overlay_class, my_peer, self.endpoint, self.network, + working_directory=database_working_dir)) self.strategies = [ (RandomWalk(overlay), 20) for overlay in self.overlays diff --git a/ipv8/test/mocking/rest/rest_peer_communication.py b/ipv8/test/mocking/rest/rest_peer_communication.py index 50da3cb2f..740ab8a93 100644 --- a/ipv8/test/mocking/rest/rest_peer_communication.py +++ b/ipv8/test/mocking/rest/rest_peer_communication.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import logging from json import loads @@ -9,8 +7,6 @@ from twisted.web.client import Agent, readBody from twisted.web.http_headers import Headers -from .peer_communication import IGetStyleRequestsAE, RequestException, IPostStyleRequestsAE - def process_json_response(func): """ @@ -109,361 +105,6 @@ def basic_url_builder(interface, port, endpoint, protocol='http'): return "%s://%s:%d/%s" % (protocol, interface, port, endpoint) -class HTTPGetRequesterAE(IGetStyleRequestsAE, HTTPRequester): - """ - Implements the GetStyleRequests abstract methods using the HTTP protocol for the attestation endpoint - """ - - def __init__(self): - IGetStyleRequestsAE.__init__(self) - HTTPRequester.__init__(self) - - @process_json_response - @inlineCallbacks - def make_outstanding(self, param_dict): - """ - Forward a request for outstanding attestation requests. - - :param param_dict: Should have at least the following structure: - { - 'interface': target peer IP or alias - 'port': port_number - 'endpoint': endpoint_name - (optional) 'callback': single parameter callback for the request's response - } - :return: the request's response - :raises RequestException: raised when the method could not find some element required for the construction of - the request - """ - interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) - - response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), - 'GET', - {'type': 'outstanding'}, - param_dict.get('callback', None)) - returnValue(response) - - @process_json_response - @inlineCallbacks - def make_verification_output(self, param_dict): - """ - Forward a request for the verification outputs. - - :param param_dict: Should have at least the following structure: - { - 'interface': target peer IP or alias - 'port': port_number - 'endpoint': endpoint_name - (optional) 'callback': single parameter callback for the request's response - } - :return: the request's response - :raises RequestException: raised when the method could not find some element required for the construction of - the request - """ - interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) - - response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), - 'GET', - {'type': 'verification_output'}, - param_dict.get('callback', None)) - returnValue(response) - - @process_json_response - @inlineCallbacks - def make_peers(self, param_dict): - """ - Forward a request for the known peers in the network. - - :param param_dict: Should have at least the following structure: - { - 'interface': target peer IP or alias - 'port': port_number - 'endpoint': endpoint_name - (optional) 'callback': single parameter callback for the request's response - } - :return: the request's response - :raises RequestException: raised when the method could not find some element required for the construction of - the request - """ - interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) - - response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), - 'GET', - {'type': 'peers'}, - param_dict.get('callback', None)) - returnValue(response) - - @process_json_response - @inlineCallbacks - def make_attributes(self, param_dict): - """ - Forward a request for the attributes of a peer. - - :param param_dict: Should have at least the following structure: - { - 'interface': target peer IP or alias - 'port': port_number - 'endpoint': endpoint_name - (optional) 'callback': single parameter callback for the request's response - } - :return: the request's response - :raises RequestException: raised when the method could not find some element required for the construction of - the request - """ - interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) - - # Add the type of the request (attributes), and the (optional) b64_mid of the attester - request_parameters = param_dict.get('request_parameters', dict()) - request_parameters.update({'type': 'attributes'}) - - response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), - 'GET', - request_parameters, - param_dict.get('callback', None)) - returnValue(response) - - @inlineCallbacks - def make_drop_identity(self, param_dict): - """ - Forward a request for dropping a peer's identity. - - :param param_dict: Should have at least the following structure: - { - 'interface': target peer IP or alias - 'port': port_number - 'endpoint': endpoint_name - (optional) 'callback': single parameter callback for the request's response - } - :return: the request's response - :raises RequestException: raised when the method could not find some element required for the construction of - the request - """ - interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) - - response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), - 'GET', - {'type': 'drop_identity'}, - param_dict.get('callback', None)) - returnValue(response) - - @process_json_response - @inlineCallbacks - def make_outstanding_verify(self, param_dict): - """ - Forward a request which requests information on the outstanding verify requests - - :param param_dict: Should have at least the following structure: - { - 'interface': target peer IP or alias - 'port': port_number - 'endpoint': endpoint_name - (optional) 'callback': single parameter callback for the request's response - } - :return: the request's response - :raises RequestException: raised when the method could not find some element required for the construction of - the request - """ - interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) - - # Add the type of the request (request), and the rest of the parameters - request_parameters = {'type': 'outstanding_verify'} - - response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), - 'GET', - request_parameters, - param_dict.get('callback', None)) - returnValue(response) - - -class HTTPPostRequesterAE(IPostStyleRequestsAE, HTTPRequester): - """ - Implements the PostStyleRequests abstract methods using the HTTP protocol for the AttestationEndpoint - """ - - def __init__(self): - IPostStyleRequestsAE.__init__(self) - HTTPRequester.__init__(self) - - @inlineCallbacks - def make_attestation_request(self, param_dict): - """ - Forward a request for the attestation of an attribute. - - :param param_dict: Should have at least the following structure: - { - 'interface': target peer IP or alias - 'port': port_number - 'endpoint': endpoint_name - 'attribute_name': attribute_name - 'mid': attester b64_mid - (optional) 'metadata': JSON style metadata required for the attestation process - (optional) 'callback': single parameter callback for the request's response - } - :return: the request's response - :raises RequestException: raised when the method could not find some element required for the construction of - the request - """ - interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) - - # Add the type of the request (request), and the rest of the parameters - request_parameters = {'type': 'request'} - - # Add the request parameters one-by-one; if required parameter is missing, then raise error - if 'attribute_name' in param_dict: - request_parameters['attribute_name'] = param_dict['attribute_name'] - else: - raise RequestException("Malformed request: did not specify the attribute_name") - - if 'mid' in param_dict: - request_parameters['mid'] = param_dict['mid'] - else: - raise RequestException("Malformed request: did not specify the attester's mid") - - if 'metadata' in param_dict: - request_parameters['metadata'] = param_dict['metadata'] - - response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), - 'POST', - request_parameters, - param_dict.get('callback', None)) - returnValue(response) - - @inlineCallbacks - def make_attest(self, param_dict): - """ - Forward a request which attests an attestation request. - - :param param_dict: Should have at least the following structure: - { - 'interface': target peer IP or alias - 'port': port_number - 'endpoint': endpoint_name - 'attribute_name': attribute_name - 'mid': attestee's b64_mid - 'attribute_value': b64 hash of the attestation blob - (optional) 'callback': single parameter callback for the request's response - } - :return: the request's response - :raises RequestException: raised when the method could not find some element required for the construction of - the request - """ - interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) - - # Add the type of the request (attest), and the rest of the parameters - request_parameters = {'type': 'attest'} - - # Add the request parameters one-by-one; if required parameter is missing, then raise error - if 'attribute_name' in param_dict: - request_parameters['attribute_name'] = param_dict['attribute_name'] - else: - raise RequestException("Malformed request: did not specify the attribute_name") - - if 'mid' in param_dict: - request_parameters['mid'] = param_dict['mid'] - else: - raise RequestException("Malformed request: did not specify the attestee's mid") - - if 'attribute_value' in param_dict: - request_parameters['attribute_value'] = param_dict['attribute_value'] - else: - raise RequestException("Malformed request: did not specify the attribute_value, i.e. the attestation" - "blob hash") - - response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), - 'POST', - request_parameters, - param_dict.get('callback', None)) - returnValue(response) - - @inlineCallbacks - def make_verify(self, param_dict): - """ - Forward a request which demands the verification of an attestation - - :param param_dict: Should have at least the following structure: - { - 'interface': target peer IP or alias - 'port': port_number - 'endpoint': endpoint_name - 'attribute_hash': the b64 hash of the attestation blob which needs to be verified - 'mid': verifier's b64_mid - 'attribute_values': a string of b64 encoded values, which are separated by ',' characters - e.g. "val_1,val_2,val_3, ..., val_N" - (optional) 'callback': single parameter callback for the request's response - } - :return: the request's response - :raises RequestException: raised when the method could not find some element required for the construction of - the request - """ - interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) - - # Add the type of the request (attest), and the rest of the parameters - request_parameters = {'type': 'verify'} - - # Add the request parameters one-by-one; if required parameter is missing, then raise error - if 'attribute_hash' in param_dict: - request_parameters['attribute_hash'] = param_dict['attribute_hash'] - else: - raise RequestException("Malformed request: did not specify the attribute_hash") - - if 'mid' in param_dict: - request_parameters['mid'] = param_dict['mid'] - else: - raise RequestException("Malformed request: did not specify the verifier's mid") - - if 'attribute_values' in param_dict: - request_parameters['attribute_values'] = param_dict['attribute_values'] - else: - raise RequestException("Malformed request: did not specify the attribute_values") - - response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), - 'POST', - request_parameters, - param_dict.get('callback', None)) - - returnValue(response) - - @inlineCallbacks - def make_allow_verify(self, param_dict): - """ - Forward a request which requests that verifications be allowed for a particular peer for a particular attribute - - :param param_dict: Should have at least the following structure: - { - 'interface': target peer IP or alias - 'port': port_number - 'endpoint': endpoint_name - 'attribute_name': attribute_name - 'mid': verifier's b64_mid - (optional) 'callback': single parameter callback for the request's response - } - :return: the request's response - :raises RequestException: raised when the method could not find some element required for the construction of - the request - """ - interface, port, endpoint = HTTPRequester.get_access_parameters(param_dict) - - # Add the type of the request (request), and the rest of the parameters - request_parameters = {'type': 'allow_verify'} - - # Add the request parameters one-by-one; if required parameter is missing, then raise error - if 'attribute_name' in param_dict: - request_parameters['attribute_name'] = param_dict['attribute_name'] - else: - raise RequestException("Malformed request: did not specify the attribute_name") - - if 'mid' in param_dict: - request_parameters['mid'] = param_dict['mid'] - else: - raise RequestException("Malformed request: did not specify the attester's mid") - - response = yield self.make_request(HTTPRequester.basic_url_builder(interface, port, endpoint), - 'POST', - request_parameters, - param_dict.get('callback', None)) - returnValue(response) - - def string_to_url(string, quote_string=False, to_utf_8=False): """ Convert a string to a format which is compatible to it being passed via a url @@ -478,3 +119,9 @@ def string_to_url(string, quote_string=False, to_utf_8=False): string = string.replace("+", "%2B") if not quote_string else quote(string.replace("+", "%2B")) return string.encode('utf-8') if to_utf_8 else string + + +class RequestException(Exception): + """ + Custom exception used to model request errors + """ diff --git a/test_classes_list.txt b/test_classes_list.txt index af0d4f8e5..d129fc251 100644 --- a/test_classes_list.txt +++ b/test_classes_list.txt @@ -41,5 +41,5 @@ ipv8/test/dht/test_routing.py:TestBucket ipv8/test/dht/test_routing.py:TestRoutingTable ipv8/test/dht/test_storage.py:TestStorage -ipv8/test/REST/test_attestation_endpoint.py:TestAttestationEndpoint - +ipv8/test/REST/attestationendpoint/test_attestation_endpoint.py:TestAttestationEndpoint +ipv8/test/REST/dht/test_dht_endpoint.py:TestDHTEndpoint From afe1a288b8c65e37cd3532db263564cd8f07d89a Mon Sep 17 00:00:00 2001 From: DanGraur Date: Thu, 15 Nov 2018 00:01:51 +0100 Subject: [PATCH 2/3] Blocks published to the DHT will now have their chunks signed using the node's secret key. In addition to this, this signature will be used on the receiving end for verifying the faithfulness of the chunks and, implicitly, of the block. Chunks will also be published under a new type of payload class, called DHTBlockPayload. The public key will be used (together with a special suffix) for publishing a block to the DHT (it should be mentioned that the raw key will be hashed before publishing). --- ipv8/REST/dht_endpoint.py | 48 ++++++++++++++++--------- ipv8/attestation/trustchain/payload.py | 25 +++++++++++++ ipv8/test/REST/dht/test_dht_endpoint.py | 45 +++++++++++++++-------- 3 files changed, 87 insertions(+), 31 deletions(-) diff --git a/ipv8/REST/dht_endpoint.py b/ipv8/REST/dht_endpoint.py index 27877d2dd..d2f84e8eb 100644 --- a/ipv8/REST/dht_endpoint.py +++ b/ipv8/REST/dht_endpoint.py @@ -4,7 +4,6 @@ from binascii import hexlify, unhexlify from hashlib import sha1 import json -import struct from twisted.internet.defer import inlineCallbacks, returnValue from twisted.web import http, resource @@ -14,6 +13,9 @@ from ..attestation.trustchain.community import TrustChainCommunity from ..dht.discovery import DHTDiscoveryCommunity from ..attestation.trustchain.listener import BlockListener +from ..attestation.trustchain.payload import DHTBlockPayload +from ..messaging.serialization import Serializer +from ..keyvault.public.libnaclkey import LibNaCLPK class DHTEndpoint(resource.Resource): @@ -53,22 +55,25 @@ def should_sign(self, block): pass KEY_SUFFIX = b'_BLOCK' + CHUNK_SIZE = MAX_ENTRY_SIZE - DHTBlockPayload.PREAMBLE_OVERHEAD - 1 def __init__(self, dht, trustchain): resource.Resource.__init__(self) self.dht = dht self.trustchain = trustchain self.block_version = 0 + self.serializer = Serializer() - self._hashed_dht_key = sha1(self.trustchain.my_peer.mid + self.KEY_SUFFIX).digest() + self._hashed_dht_key = sha1(self.trustchain.my_peer.public_key.key_to_bin() + self.KEY_SUFFIX).digest() trustchain.add_listener(self, [trustchain.UNIVERSAL_BLOCK_LISTENER]) - def reconstruct_all_blocks(self, block_chunks): + def reconstruct_all_blocks(self, block_chunks, public_key): """ Given a list of block chunks, reconstruct all the blocks in a dictionary indexed by their version :param block_chunks: the list of block chunks + :param public_key: the public key of the publishing node, which will be used for verifying the chunks :return: a dictionary of reconstructed blocks (in packed format), indexed by the version of the blocks, and the maximal version """ @@ -76,22 +81,26 @@ def reconstruct_all_blocks(self, block_chunks): max_version = 0 for entry in block_chunks: - this_version = struct.unpack("H", entry[1:3])[0] - max_version = max_version if max_version > this_version else this_version + package = self.serializer.unpack_to_serializables([DHTBlockPayload, ], entry[1:])[0] - new_blocks[this_version] = entry[3:] + new_blocks[this_version] if this_version in new_blocks else entry[3:] + if public_key.verify(package.signature, str(package.version).encode('utf-8') + package.payload): + max_version = max_version if max_version > package.version else package.version + + new_blocks[package.version] = package.payload + new_blocks[package.version] \ + if package.version in new_blocks else package.payload return new_blocks, max_version - def _is_duplicate(self, latest_block): + def _is_duplicate(self, latest_block, public_key): """ Checks to see if this block has already been published to the DHT :param latest_block: the PACKED version of the latest block + :param public_key: the public key of the publishing node, which will be used for verifying the chunks :return: True if the block has indeed been published before, False otherwise """ block_chunks = self.dht.storage.get(self._hashed_dht_key) - new_blocks, _ = self.reconstruct_all_blocks(block_chunks) + new_blocks, _ = self.reconstruct_all_blocks(block_chunks, public_key) for val in new_blocks.values(): if val == latest_block: @@ -109,15 +118,20 @@ def publish_latest_block(self): if latest_block: # Get all the previously published blocks for this peer from the DHT, and check if this is a duplicate latest_block = latest_block.pack() - if self._is_duplicate(latest_block): + if self._is_duplicate(latest_block, self.trustchain.my_peer.public_key): returnValue(None) - version = struct.pack("H", self.block_version) - self.block_version += 1 + my_private_key = self.trustchain.my_peer.key + + for i in range(0, len(latest_block), self.CHUNK_SIZE): + chunk = latest_block[i: i + self.CHUNK_SIZE] + signature = my_private_key.signature(str(self.block_version).encode('utf-8') + chunk) + blob_chunk = self.serializer.pack_multiple( + DHTBlockPayload(signature, self.block_version, chunk).to_pack_list()) - for i in range(0, len(latest_block), MAX_ENTRY_SIZE - 3): - blob_chunk = version + latest_block[i:i + MAX_ENTRY_SIZE - 3] - yield self.dht.store_value(self._hashed_dht_key, blob_chunk) + yield self.dht.store_value(self._hashed_dht_key, blob_chunk[0]) + + self.block_version += 1 def render_GET(self, request): """ @@ -135,14 +149,16 @@ def render_GET(self, request): request.setResponseCode(http.BAD_REQUEST) return json.dumps({"error": "Must specify the peer's public key"}).encode('utf-8') - hash_key = sha1(b64decode(request.args[b'public_key'][0]) + self.KEY_SUFFIX).digest() + raw_public_key = b64decode(request.args[b'public_key'][0]) + hash_key = sha1(raw_public_key + self.KEY_SUFFIX).digest() block_chunks = self.dht.storage.get(hash_key) if not block_chunks: request.setResponseCode(http.NOT_FOUND) return json.dumps({"error": "Could not find a block for the specified key."}).encode('utf-8') - new_blocks, max_version = self.reconstruct_all_blocks(block_chunks) + target_public_key = LibNaCLPK(binarykey=raw_public_key[10:]) + new_blocks, max_version = self.reconstruct_all_blocks(block_chunks, target_public_key) return json.dumps({"block": b64encode(new_blocks[max_version]).decode('utf-8')}).encode('utf-8') diff --git a/ipv8/attestation/trustchain/payload.py b/ipv8/attestation/trustchain/payload.py index f9345623f..30e822c05 100644 --- a/ipv8/attestation/trustchain/payload.py +++ b/ipv8/attestation/trustchain/payload.py @@ -333,3 +333,28 @@ def to_pack_list(self): @classmethod def from_unpack_list(cls, *args): return HalfBlockPairBroadcastPayload(*args) + + +class DHTBlockPayload(Payload): + """ + Class which represents the payloads published to the DHT for + """ + format_list = ['64s', 'H', 'raw'] + PREAMBLE_OVERHEAD = 66 # This stems from the 64 byte signature and the 2 byte unsigned short + + def __init__(self, signature, version, payload): + super(DHTBlockPayload, self).__init__() + self.signature = signature + self.version = version + self.payload = payload + + def to_pack_list(self): + return [ + ('64s', self.signature), + ('H', self.version), + ('raw', self.payload) + ] + + @classmethod + def from_unpack_list(cls, signature, version, payload): + return DHTBlockPayload(signature, version, payload) diff --git a/ipv8/test/REST/dht/test_dht_endpoint.py b/ipv8/test/REST/dht/test_dht_endpoint.py index a3a131b51..9df909b16 100644 --- a/ipv8/test/REST/dht/test_dht_endpoint.py +++ b/ipv8/test/REST/dht/test_dht_endpoint.py @@ -1,7 +1,7 @@ -import struct - from hashlib import sha1 from base64 import b64encode, b64decode +from collections import deque + from twisted.internet.defer import inlineCallbacks from .rest_peer_communication import HTTPGetRequesterDHT @@ -10,9 +10,9 @@ from ...mocking.rest.rest_peer_communication import string_to_url from ...mocking.rest.rest_api_peer import RestTestPeer from ...mocking.rest.comunities import TestDHTCommunity, TestTrustchainCommunity -from ....attestation.trustchain.payload import HalfBlockPayload +from ....attestation.trustchain.payload import HalfBlockPayload, DHTBlockPayload from ....attestation.trustchain.community import TrustChainCommunity -from ....dht.community import DHTCommunity, MAX_ENTRY_SIZE +from ....dht.community import DHTCommunity from ....REST.dht_endpoint import DHTBlockEndpoint from ....messaging.serialization import Serializer @@ -41,11 +41,15 @@ def publish_to_DHT(self, peer, key, data, numeric_version): :param numeric_version: the version of the data :return: None """ - version = struct.pack("H", numeric_version) + my_private_key = peer.get_keys()['my_peer'].key + + for i in range(0, len(data), DHTBlockEndpoint.CHUNK_SIZE): + chunk = data[i: i + DHTBlockEndpoint.CHUNK_SIZE] + signature = my_private_key.signature(str(numeric_version).encode('utf-8') + chunk) - for i in range(0, len(data), MAX_ENTRY_SIZE - 3): - blob_chunk = version + data[i:i + MAX_ENTRY_SIZE - 3] - yield peer.get_overlay_by_class(DHTCommunity).store_value(key, blob_chunk) + blob_chunk = self.serializer.pack_multiple(DHTBlockPayload(signature, numeric_version, chunk) + .to_pack_list()) + yield peer.get_overlay_by_class(DHTCommunity).store_value(key, blob_chunk[0]) def deserialize_payload(self, serializables, data): """ @@ -58,6 +62,13 @@ def deserialize_payload(self, serializables, data): payload = self.serializer.unpack_to_serializables(serializables, data) return payload[:-1][0] + def _increase_request_limit(self, new_request_limit): + for node in self.nodes: + routing_table = node.get_overlay_by_class(DHTCommunity).routing_table + for other in routing_table.closest_nodes(routing_table.my_node_id): + if other != node: + routing_table.get(other.id).last_queries = deque(maxlen=new_request_limit) + @inlineCallbacks def test_added_block_explicit(self): """ @@ -67,14 +78,15 @@ def test_added_block_explicit(self): 'port': self.nodes[0].port, 'interface': self.nodes[0].interface, 'endpoint': 'dht/block', - 'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].mid)) + 'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin())) } # Introduce the nodes yield self.introduce_nodes(DHTCommunity) # Manually add a block to the Trustchain original_block = TestBlock() - hash_key = sha1(self.nodes[0].get_keys()['my_peer'].mid + DHTBlockEndpoint.KEY_SUFFIX).digest() + hash_key = sha1(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin() + + DHTBlockEndpoint.KEY_SUFFIX).digest() yield self.publish_to_DHT(self.nodes[0], hash_key, original_block.pack(), 4536) @@ -99,7 +111,7 @@ def test_added_block_implicit(self): 'port': self.nodes[1].port, 'interface': self.nodes[1].interface, 'endpoint': 'dht/block', - 'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].mid)) + 'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin())) } # Introduce the nodes yield self.introduce_nodes(DHTCommunity) @@ -109,6 +121,7 @@ def test_added_block_implicit(self): yield self.nodes[0].get_overlay_by_class(TrustChainCommunity).create_source_block(b'test', {}) original_block = self.nodes[0].get_overlay_by_class(TrustChainCommunity).persistence.get(publisher_pk, 1) yield self.deliver_messages() + yield self.sleep() # Get the block through the REST API response = yield self._get_style_requests.make_dht_block(param_dict) @@ -116,7 +129,7 @@ def test_added_block_implicit(self): response = b64decode(response['block']) # Reconstruct the block from what was received in the response - payload = self.deserialize_payload((HalfBlockPayload,), response) + payload = self.deserialize_payload((HalfBlockPayload, ), response) reconstructed_block = self.nodes[0].get_overlay_by_class(TrustChainCommunity).get_block_class(payload.type)\ .from_payload(payload, self.serializer) @@ -132,15 +145,17 @@ def test_latest_block(self): 'port': self.nodes[1].port, 'interface': self.nodes[1].interface, 'endpoint': 'dht/block', - 'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].mid)) + 'public_key': string_to_url(b64encode(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin())) } - # Introduce the nodes + # Introduce the nodes and increase the size of the request queues yield self.introduce_nodes(DHTCommunity) + self._increase_request_limit(20) # Manually add a block to the Trustchain original_block_1 = TestBlock(transaction={1: 'asd'}) original_block_2 = TestBlock(transaction={1: 'mmm'}) - hash_key = sha1(self.nodes[0].get_keys()['my_peer'].mid + DHTBlockEndpoint.KEY_SUFFIX).digest() + hash_key = sha1(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin() + + DHTBlockEndpoint.KEY_SUFFIX).digest() # Publish the two blocks under the same key in the first peer yield self.publish_to_DHT(self.nodes[0], hash_key, original_block_1.pack(), 4536) From 49f31b9beb776571b8523ed5ae0ef327014a4b64 Mon Sep 17 00:00:00 2001 From: DanGraur Date: Mon, 11 Feb 2019 11:31:44 +0100 Subject: [PATCH 3/3] Embedded the chunk position and the total number of chunks in the DHTBlockPayload. Removed the dht.storage.get and replaced them with dht.find_values calls, which are handled asynchronously. Finally, made a few changes and additions to the test_dht_endpoint module. --- ipv8/REST/dht_endpoint.py | 107 +++++++++++------- ipv8/attestation/trustchain/payload.py | 28 +++-- .../rest_peer_communication.py | 2 + .../test_attestation_endpoint.py | 2 + ipv8/test/REST/dht/rest_peer_communication.py | 2 + ipv8/test/REST/dht/test_dht_endpoint.py | 74 ++++++++++-- ipv8/test/mocking/rest/comunities.py | 2 + ipv8/test/mocking/rest/ipv8.py | 2 + 8 files changed, 160 insertions(+), 59 deletions(-) diff --git a/ipv8/REST/dht_endpoint.py b/ipv8/REST/dht_endpoint.py index d2f84e8eb..0ed8ffe36 100644 --- a/ipv8/REST/dht_endpoint.py +++ b/ipv8/REST/dht_endpoint.py @@ -5,7 +5,6 @@ from hashlib import sha1 import json -from twisted.internet.defer import inlineCallbacks, returnValue from twisted.web import http, resource from twisted.web.server import NOT_DONE_YET @@ -81,34 +80,24 @@ def reconstruct_all_blocks(self, block_chunks, public_key): max_version = 0 for entry in block_chunks: - package = self.serializer.unpack_to_serializables([DHTBlockPayload, ], entry[1:])[0] + package = self.serializer.unpack_to_serializables([DHTBlockPayload, ], entry)[0] - if public_key.verify(package.signature, str(package.version).encode('utf-8') + package.payload): + if public_key.verify(package.signature, str(package.version).encode('utf-8') + + str(package.block_position).encode('utf-8') + + str(package.block_count).encode('utf-8') + package.payload): max_version = max_version if max_version > package.version else package.version - new_blocks[package.version] = package.payload + new_blocks[package.version] \ - if package.version in new_blocks else package.payload + if package.version not in new_blocks: + new_blocks[package.version] = [''] * package.block_count - return new_blocks, max_version - - def _is_duplicate(self, latest_block, public_key): - """ - Checks to see if this block has already been published to the DHT - - :param latest_block: the PACKED version of the latest block - :param public_key: the public key of the publishing node, which will be used for verifying the chunks - :return: True if the block has indeed been published before, False otherwise - """ - block_chunks = self.dht.storage.get(self._hashed_dht_key) - new_blocks, _ = self.reconstruct_all_blocks(block_chunks, public_key) + new_blocks[package.version][package.block_position] = package.payload - for val in new_blocks.values(): - if val == latest_block: - return True + # Concatenate the blocks + for version in new_blocks: + new_blocks[version] = b''.join(new_blocks[version]) - return False + return new_blocks, max_version - @inlineCallbacks def publish_latest_block(self): """ Publish the latest block of this node's TrustChain to the DHT @@ -118,20 +107,41 @@ def publish_latest_block(self): if latest_block: # Get all the previously published blocks for this peer from the DHT, and check if this is a duplicate latest_block = latest_block.pack() - if self._is_duplicate(latest_block, self.trustchain.my_peer.public_key): - returnValue(None) - my_private_key = self.trustchain.my_peer.key + def on_success(block_chunks): + new_blocks, _ = self.reconstruct_all_blocks([x[0] for x in block_chunks], + self.trustchain.my_peer.public_key) + + # Check for duplication + for val in new_blocks.values(): + if val == latest_block: + return - for i in range(0, len(latest_block), self.CHUNK_SIZE): - chunk = latest_block[i: i + self.CHUNK_SIZE] - signature = my_private_key.signature(str(self.block_version).encode('utf-8') + chunk) - blob_chunk = self.serializer.pack_multiple( - DHTBlockPayload(signature, self.block_version, chunk).to_pack_list()) + # If we reached this point, it means the latest_block is novel + my_private_key = self.trustchain.my_peer.key - yield self.dht.store_value(self._hashed_dht_key, blob_chunk[0]) + # Get the total number of chunks in this blocks + total_blocks = len(latest_block) // self.CHUNK_SIZE + total_blocks += 1 if len(latest_block) % self.CHUNK_SIZE != 0 else 0 - self.block_version += 1 + # To make this faster we'll use addition instead of multiplication, and use a pointer + slice_pointer = 0 + + for i in range(total_blocks): + chunk = latest_block[slice_pointer: slice_pointer + self.CHUNK_SIZE] + slice_pointer += self.CHUNK_SIZE + signature = my_private_key.signature( + str(self.block_version).encode('utf-8') + str(i).encode('utf-8') + + str(total_blocks).encode('utf-8') + chunk) + blob_chunk = self.serializer.pack_multiple( + DHTBlockPayload(signature, self.block_version, i, total_blocks, chunk).to_pack_list()) + + self.dht.store_value(self._hashed_dht_key, blob_chunk[0]) + + self.block_version += 1 + + deferred = self.dht.find_values(self._hashed_dht_key) + deferred.addCallback(on_success) def render_GET(self, request): """ @@ -149,18 +159,35 @@ def render_GET(self, request): request.setResponseCode(http.BAD_REQUEST) return json.dumps({"error": "Must specify the peer's public key"}).encode('utf-8') + def on_success(block_chunks): + if not block_chunks: + request.setResponseCode(http.NOT_FOUND) + return json.dumps({"error": "Could not find any blocks for the specified key."}).encode('utf-8') + + target_public_key = LibNaCLPK(binarykey=raw_public_key[10:]) + # Discard the 2nd half of the tuples retrieved as a result of the DHT query + new_blocks, max_version = self.reconstruct_all_blocks([x[0] for x in block_chunks], target_public_key) + request.write(json.dumps({"block": b64encode(new_blocks[max_version]).decode('utf-8')}).encode('utf-8')) + request.finish() + + def on_failure(failure): + request.setResponseCode(http.INTERNAL_SERVER_ERROR) + request.write(json.dumps({ + u"error": { + u"handled": True, + u"code": failure.value.__class__.__name__, + u"message": failure.value.message + } + })) + raw_public_key = b64decode(request.args[b'public_key'][0]) hash_key = sha1(raw_public_key + self.KEY_SUFFIX).digest() - block_chunks = self.dht.storage.get(hash_key) - - if not block_chunks: - request.setResponseCode(http.NOT_FOUND) - return json.dumps({"error": "Could not find a block for the specified key."}).encode('utf-8') - target_public_key = LibNaCLPK(binarykey=raw_public_key[10:]) - new_blocks, max_version = self.reconstruct_all_blocks(block_chunks, target_public_key) + deferred = self.dht.find_values(hash_key) + deferred.addCallback(on_success) + deferred.addErrback(on_failure) - return json.dumps({"block": b64encode(new_blocks[max_version]).decode('utf-8')}).encode('utf-8') + return NOT_DONE_YET class DHTStatisticsEndpoint(resource.Resource): diff --git a/ipv8/attestation/trustchain/payload.py b/ipv8/attestation/trustchain/payload.py index 30e822c05..435320bbe 100644 --- a/ipv8/attestation/trustchain/payload.py +++ b/ipv8/attestation/trustchain/payload.py @@ -337,24 +337,38 @@ def from_unpack_list(cls, *args): class DHTBlockPayload(Payload): """ - Class which represents the payloads published to the DHT for + Class which represents the payloads published to the DHT for disseminating chunks of TrustChain blocks """ - format_list = ['64s', 'H', 'raw'] - PREAMBLE_OVERHEAD = 66 # This stems from the 64 byte signature and the 2 byte unsigned short - - def __init__(self, signature, version, payload): + format_list = ['64s', 'H', 'H', 'H', 'raw'] + PREAMBLE_OVERHEAD = 70 # This stems from the 64 byte signature and the 6 bytes of unsigned shorts + + def __init__(self, signature, version, block_position, block_count, payload): + """ + Construct a DHTBlockPayload object (which generally represents a chuck of a TrustChain block), + which should normally be serialized and published to the DHT + + :param signature: A signature of this block's body (version + block_position + block_count + payload) + :param version: This block's version (greater values indicate newer blocks) + :param block_position: This chunk's position in the original block (among the other chunks) + :param block_count: The total number of chunks in the block + :param payload: The chunk itself + """ super(DHTBlockPayload, self).__init__() self.signature = signature self.version = version + self.block_position = block_position + self.block_count = block_count self.payload = payload def to_pack_list(self): return [ ('64s', self.signature), ('H', self.version), + ('H', self.block_position), + ('H', self.block_count), ('raw', self.payload) ] @classmethod - def from_unpack_list(cls, signature, version, payload): - return DHTBlockPayload(signature, version, payload) + def from_unpack_list(cls, signature, version, payload, block_position, block_count): + return DHTBlockPayload(signature, version, payload, block_position, block_count) diff --git a/ipv8/test/REST/attestationendpoint/rest_peer_communication.py b/ipv8/test/REST/attestationendpoint/rest_peer_communication.py index c54aa7bb5..7cd1abb4f 100644 --- a/ipv8/test/REST/attestationendpoint/rest_peer_communication.py +++ b/ipv8/test/REST/attestationendpoint/rest_peer_communication.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from twisted.internet.defer import inlineCallbacks, returnValue from .peer_communication import IGetStyleRequestsAE, IPostStyleRequestsAE diff --git a/ipv8/test/REST/attestationendpoint/test_attestation_endpoint.py b/ipv8/test/REST/attestationendpoint/test_attestation_endpoint.py index c3b91f2a4..5e0804df6 100644 --- a/ipv8/test/REST/attestationendpoint/test_attestation_endpoint.py +++ b/ipv8/test/REST/attestationendpoint/test_attestation_endpoint.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from base64 import b64encode from hashlib import sha1 from json import dumps diff --git a/ipv8/test/REST/dht/rest_peer_communication.py b/ipv8/test/REST/dht/rest_peer_communication.py index 22225c515..8cad878f8 100644 --- a/ipv8/test/REST/dht/rest_peer_communication.py +++ b/ipv8/test/REST/dht/rest_peer_communication.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from twisted.internet.defer import inlineCallbacks, returnValue from ...mocking.rest.rest_peer_communication import HTTPRequester, RequestException, process_json_response diff --git a/ipv8/test/REST/dht/test_dht_endpoint.py b/ipv8/test/REST/dht/test_dht_endpoint.py index 9df909b16..f36269707 100644 --- a/ipv8/test/REST/dht/test_dht_endpoint.py +++ b/ipv8/test/REST/dht/test_dht_endpoint.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from hashlib import sha1 from base64 import b64encode, b64decode from collections import deque @@ -43,12 +45,21 @@ def publish_to_DHT(self, peer, key, data, numeric_version): """ my_private_key = peer.get_keys()['my_peer'].key - for i in range(0, len(data), DHTBlockEndpoint.CHUNK_SIZE): - chunk = data[i: i + DHTBlockEndpoint.CHUNK_SIZE] - signature = my_private_key.signature(str(numeric_version).encode('utf-8') + chunk) + # Get the total number of chunks in this blocks + total_blocks = len(data) // DHTBlockEndpoint.CHUNK_SIZE + total_blocks += 1 if len(data) % DHTBlockEndpoint.CHUNK_SIZE != 0 else 0 + + # To make this faster we'll use addition instead of multiplication, and use a pointer + slice_pointer = 0 + + for i in range(total_blocks): + chunk = data[slice_pointer: slice_pointer + DHTBlockEndpoint.CHUNK_SIZE] + slice_pointer += DHTBlockEndpoint.CHUNK_SIZE + signature = my_private_key.signature(str(numeric_version).encode('utf-8') + str(i).encode('utf-8') + + str(total_blocks).encode('utf-8') + chunk) - blob_chunk = self.serializer.pack_multiple(DHTBlockPayload(signature, numeric_version, chunk) - .to_pack_list()) + blob_chunk = self.serializer.pack_multiple(DHTBlockPayload(signature, numeric_version, i, total_blocks, + chunk).to_pack_list()) yield peer.get_overlay_by_class(DHTCommunity).store_value(key, blob_chunk[0]) def deserialize_payload(self, serializables, data): @@ -175,10 +186,10 @@ def test_latest_block(self): self.assertNotEqual(reconstructed_block, original_block_1, "The received block was equal to the older block") @inlineCallbacks - def test_block_duplication(self): + def test_block_duplication_explicit(self): """ - Test that a block which has already been pubished in the DHT will not be republished again; i.e. no - duplicate blocks in the DHT under different (embedded) versions. + Test that a block which has already been published in the DHT (explicitly) will not be republished again; + i.e. no duplicate blocks in the DHT under different (embedded) versions. """ # Introduce the nodes yield self.introduce_nodes(DHTCommunity) @@ -188,14 +199,51 @@ def test_block_duplication(self): self.nodes[0].get_overlay_by_class(TrustChainCommunity).persistence.add_block(original_block) # Publish the node to the DHT - hash_key = sha1(self.nodes[0].get_keys()['my_peer'].mid + DHTBlockEndpoint.KEY_SUFFIX).digest() + hash_key = sha1(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin() + + DHTBlockEndpoint.KEY_SUFFIX).digest() - result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key) + result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key) self.assertEqual(result, [], "There shouldn't be any blocks for this key") yield self.publish_to_DHT(self.nodes[0], hash_key, original_block.pack(), 4536) - result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key) + result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key) + self.assertNotEqual(result, [], "There should be at least one chunk for this key") + + chunk_number = len(result) + + # Force call the method which publishes the latest block to the DHT and check that it did not affect the DHT + self.nodes[0].get_overlay_by_class(TrustChainCommunity) \ + .notify_listeners(TestBlock(TrustChainCommunity.UNIVERSAL_BLOCK_LISTENER)) + yield self.deliver_messages() + yield self.sleep() + + # Query the DHT again + result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key) + self.assertEqual(len(result), chunk_number, "The contents of the DHT have been changed. This should not happen") + + @inlineCallbacks + def test_block_duplication_implicit(self): + """ + Test that a block which has already been published in the DHT (implicitly) will not be republished again; + i.e. no duplicate blocks in the DHT under different (embedded) versions. + """ + # Introduce the nodes + yield self.introduce_nodes(DHTCommunity) + + # Publish the node to the DHT + hash_key = sha1(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin() + + DHTBlockEndpoint.KEY_SUFFIX).digest() + + result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key) + self.assertEqual(result, [], "There shouldn't be any blocks for this key") + + # Create a source block, and implicitly disseminate it + yield self.nodes[0].get_overlay_by_class(TrustChainCommunity).create_source_block(b'test', {}) + yield self.deliver_messages() + yield self.sleep() + + result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key) self.assertNotEqual(result, [], "There should be at least one chunk for this key") chunk_number = len(result) @@ -203,7 +251,9 @@ def test_block_duplication(self): # Force call the method which publishes the latest block to the DHT and check that it did not affect the DHT self.nodes[0].get_overlay_by_class(TrustChainCommunity) \ .notify_listeners(TestBlock(TrustChainCommunity.UNIVERSAL_BLOCK_LISTENER)) + yield self.deliver_messages() + yield self.sleep() # Query the DHT again - result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key) + result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key) self.assertEqual(len(result), chunk_number, "The contents of the DHT have been changed. This should not happen") diff --git a/ipv8/test/mocking/rest/comunities.py b/ipv8/test/mocking/rest/comunities.py index 812d1cbf2..0a3b5047a 100644 --- a/ipv8/test/mocking/rest/comunities.py +++ b/ipv8/test/mocking/rest/comunities.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from ....attestation.identity.community import IdentityCommunity from ....attestation.wallet.community import AttestationCommunity from ....attestation.trustchain.community import TrustChainCommunity diff --git a/ipv8/test/mocking/rest/ipv8.py b/ipv8/test/mocking/rest/ipv8.py index f1eb9a87a..cd12a0508 100644 --- a/ipv8/test/mocking/rest/ipv8.py +++ b/ipv8/test/mocking/rest/ipv8.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + import threading from twisted.internet.task import LoopingCall