Skip to content

Commit

Permalink
Embedded the chunk position and the total number of chunks in the DHT…
Browse files Browse the repository at this point in the history
…BlockPayload. Removed the dht.storage.get and replaced them with dht.find_values calls, which are handled asynchronously. Finally, made a few changes and additions to the test_dht_endpoint module.
  • Loading branch information
DanGraur authored and DanGraur committed Feb 10, 2019
1 parent afe1a28 commit 2b998c6
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 58 deletions.
107 changes: 68 additions & 39 deletions ipv8/REST/dht_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,34 +81,24 @@ def reconstruct_all_blocks(self, block_chunks, public_key):
max_version = 0

for entry in block_chunks:
package = self.serializer.unpack_to_serializables([DHTBlockPayload, ], entry[1:])[0]
package = self.serializer.unpack_to_serializables([DHTBlockPayload, ], entry)[0]

if public_key.verify(package.signature, str(package.version).encode('utf-8') + package.payload):
if public_key.verify(package.signature, str(package.version).encode('utf-8') +
str(package.block_position).encode('utf-8') +
str(package.block_count).encode('utf-8') + package.payload):
max_version = max_version if max_version > package.version else package.version

new_blocks[package.version] = package.payload + new_blocks[package.version] \
if package.version in new_blocks else package.payload
if package.version not in new_blocks:
new_blocks[package.version] = [''] * package.block_count

return new_blocks, max_version

def _is_duplicate(self, latest_block, public_key):
"""
Checks to see if this block has already been published to the DHT
:param latest_block: the PACKED version of the latest block
:param public_key: the public key of the publishing node, which will be used for verifying the chunks
:return: True if the block has indeed been published before, False otherwise
"""
block_chunks = self.dht.storage.get(self._hashed_dht_key)
new_blocks, _ = self.reconstruct_all_blocks(block_chunks, public_key)
new_blocks[package.version][package.block_position] = package.payload

for val in new_blocks.values():
if val == latest_block:
return True
# Concatenate the blocks
for version in new_blocks:
new_blocks[version] = b''.join(new_blocks[version])

return False
return new_blocks, max_version

@inlineCallbacks
def publish_latest_block(self):
"""
Publish the latest block of this node's TrustChain to the DHT
Expand All @@ -118,20 +108,41 @@ def publish_latest_block(self):
if latest_block:
# Get all the previously published blocks for this peer from the DHT, and check if this is a duplicate
latest_block = latest_block.pack()
if self._is_duplicate(latest_block, self.trustchain.my_peer.public_key):
returnValue(None)

my_private_key = self.trustchain.my_peer.key
def on_success(block_chunks):
new_blocks, _ = self.reconstruct_all_blocks(list(map(lambda x: x[0], block_chunks)),
self.trustchain.my_peer.public_key)

# Check for duplication
for val in new_blocks.values():
if val == latest_block:
return

for i in range(0, len(latest_block), self.CHUNK_SIZE):
chunk = latest_block[i: i + self.CHUNK_SIZE]
signature = my_private_key.signature(str(self.block_version).encode('utf-8') + chunk)
blob_chunk = self.serializer.pack_multiple(
DHTBlockPayload(signature, self.block_version, chunk).to_pack_list())
# If we reached this point, it means the latest_block is novel
my_private_key = self.trustchain.my_peer.key

yield self.dht.store_value(self._hashed_dht_key, blob_chunk[0])
# Get the total number of chunks in this blocks
total_blocks = len(latest_block) // self.CHUNK_SIZE
total_blocks += 1 if len(latest_block) % self.CHUNK_SIZE != 0 else 0

self.block_version += 1
# To make this faster we'll use addition instead of multiplication, and use a pointer
slice_pointer = 0

for i in range(total_blocks):
chunk = latest_block[slice_pointer: slice_pointer + self.CHUNK_SIZE]
slice_pointer += self.CHUNK_SIZE
signature = my_private_key.signature(
str(self.block_version).encode('utf-8') + str(i).encode('utf-8') +
str(total_blocks).encode('utf-8') + chunk)
blob_chunk = self.serializer.pack_multiple(
DHTBlockPayload(signature, self.block_version, i, total_blocks, chunk).to_pack_list())

self.dht.store_value(self._hashed_dht_key, blob_chunk[0])

self.block_version += 1

deferred = self.dht.find_values(self._hashed_dht_key)
deferred.addCallback(on_success)

def render_GET(self, request):
"""
Expand All @@ -149,18 +160,36 @@ def render_GET(self, request):
request.setResponseCode(http.BAD_REQUEST)
return json.dumps({"error": "Must specify the peer's public key"}).encode('utf-8')

def on_success(block_chunks):
if not block_chunks:
request.setResponseCode(http.NOT_FOUND)
return json.dumps({"error": "Could not find any blocks for the specified key."}).encode('utf-8')

target_public_key = LibNaCLPK(binarykey=raw_public_key[10:])
# Discard the 2nd half of the tuples retrieved as a result of the DHT query
new_blocks, max_version = self.reconstruct_all_blocks(list(map(lambda x: x[0], block_chunks)),
target_public_key)
request.write(json.dumps({"block": b64encode(new_blocks[max_version]).decode('utf-8')}).encode('utf-8'))
request.finish()

def on_failure(failure):
request.setResponseCode(http.INTERNAL_SERVER_ERROR)
request.write(json.dumps({
u"error": {
u"handled": True,
u"code": failure.value.__class__.__name__,
u"message": failure.value.message
}
}))

raw_public_key = b64decode(request.args[b'public_key'][0])
hash_key = sha1(raw_public_key + self.KEY_SUFFIX).digest()
block_chunks = self.dht.storage.get(hash_key)

if not block_chunks:
request.setResponseCode(http.NOT_FOUND)
return json.dumps({"error": "Could not find a block for the specified key."}).encode('utf-8')

target_public_key = LibNaCLPK(binarykey=raw_public_key[10:])
new_blocks, max_version = self.reconstruct_all_blocks(block_chunks, target_public_key)
deferred = self.dht.find_values(hash_key)
deferred.addCallback(on_success)
deferred.addErrback(on_failure)

return json.dumps({"block": b64encode(new_blocks[max_version]).decode('utf-8')}).encode('utf-8')
return NOT_DONE_YET


class DHTStatisticsEndpoint(resource.Resource):
Expand Down
28 changes: 21 additions & 7 deletions ipv8/attestation/trustchain/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,24 +337,38 @@ def from_unpack_list(cls, *args):

class DHTBlockPayload(Payload):
"""
Class which represents the payloads published to the DHT for
Class which represents the payloads published to the DHT for disseminating chunks of TrustChain blocks
"""
format_list = ['64s', 'H', 'raw']
PREAMBLE_OVERHEAD = 66 # This stems from the 64 byte signature and the 2 byte unsigned short

def __init__(self, signature, version, payload):
format_list = ['64s', 'H', 'H', 'H', 'raw']
PREAMBLE_OVERHEAD = 70 # This stems from the 64 byte signature and the 6 bytes of unsigned shorts

def __init__(self, signature, version, block_position, block_count, payload):
"""
Construct a DHTBlockPayload object (which generally represents a chuck of a TrustChain block),
which should normally be serialized and published to the DHT
:param signature: A signature of this block's body (version + block_position + block_count + payload)
:param version: This block's version (greater values indicate newer blocks)
:param block_position: This chunk's position in the original block (among the other chunks)
:param block_count: The total number of chunks in the block
:param payload: The chunk itself
"""
super(DHTBlockPayload, self).__init__()
self.signature = signature
self.version = version
self.block_position = block_position
self.block_count = block_count
self.payload = payload

def to_pack_list(self):
return [
('64s', self.signature),
('H', self.version),
('H', self.block_position),
('H', self.block_count),
('raw', self.payload)
]

@classmethod
def from_unpack_list(cls, signature, version, payload):
return DHTBlockPayload(signature, version, payload)
def from_unpack_list(cls, signature, version, payload, block_position, block_count):
return DHTBlockPayload(signature, version, payload, block_position, block_count)
72 changes: 60 additions & 12 deletions ipv8/test/REST/dht/test_dht_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,21 @@ def publish_to_DHT(self, peer, key, data, numeric_version):
"""
my_private_key = peer.get_keys()['my_peer'].key

for i in range(0, len(data), DHTBlockEndpoint.CHUNK_SIZE):
chunk = data[i: i + DHTBlockEndpoint.CHUNK_SIZE]
signature = my_private_key.signature(str(numeric_version).encode('utf-8') + chunk)
# Get the total number of chunks in this blocks
total_blocks = len(data) // DHTBlockEndpoint.CHUNK_SIZE
total_blocks += 1 if len(data) % DHTBlockEndpoint.CHUNK_SIZE != 0 else 0

blob_chunk = self.serializer.pack_multiple(DHTBlockPayload(signature, numeric_version, chunk)
.to_pack_list())
# To make this faster we'll use addition instead of multiplication, and use a pointer
slice_pointer = 0

for i in range(total_blocks):
chunk = data[slice_pointer: slice_pointer + DHTBlockEndpoint.CHUNK_SIZE]
slice_pointer += DHTBlockEndpoint.CHUNK_SIZE
signature = my_private_key.signature(str(numeric_version).encode('utf-8') + str(i).encode('utf-8') +
str(total_blocks).encode('utf-8') + chunk)

blob_chunk = self.serializer.pack_multiple(DHTBlockPayload(signature, numeric_version, i, total_blocks,
chunk).to_pack_list())
yield peer.get_overlay_by_class(DHTCommunity).store_value(key, blob_chunk[0])

def deserialize_payload(self, serializables, data):
Expand Down Expand Up @@ -175,10 +184,10 @@ def test_latest_block(self):
self.assertNotEqual(reconstructed_block, original_block_1, "The received block was equal to the older block")

@inlineCallbacks
def test_block_duplication(self):
def test_block_duplication_explicit(self):
"""
Test that a block which has already been pubished in the DHT will not be republished again; i.e. no
duplicate blocks in the DHT under different (embedded) versions.
Test that a block which has already been published in the DHT (explicitly) will not be republished again;
i.e. no duplicate blocks in the DHT under different (embedded) versions.
"""
# Introduce the nodes
yield self.introduce_nodes(DHTCommunity)
Expand All @@ -188,22 +197,61 @@ def test_block_duplication(self):
self.nodes[0].get_overlay_by_class(TrustChainCommunity).persistence.add_block(original_block)

# Publish the node to the DHT
hash_key = sha1(self.nodes[0].get_keys()['my_peer'].mid + DHTBlockEndpoint.KEY_SUFFIX).digest()
hash_key = sha1(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin() +
DHTBlockEndpoint.KEY_SUFFIX).digest()

result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key)
result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key)
self.assertEqual(result, [], "There shouldn't be any blocks for this key")

yield self.publish_to_DHT(self.nodes[0], hash_key, original_block.pack(), 4536)

result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key)
result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key)
self.assertNotEqual(result, [], "There should be at least one chunk for this key")

chunk_number = len(result)

# Force call the method which publishes the latest block to the DHT and check that it did not affect the DHT
self.nodes[0].get_overlay_by_class(TrustChainCommunity) \
.notify_listeners(TestBlock(TrustChainCommunity.UNIVERSAL_BLOCK_LISTENER))
yield self.deliver_messages()
yield self.sleep()

# Query the DHT again
result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key)
self.assertEqual(len(result), chunk_number, "The contents of the DHT have been changed. This should not happen")

@inlineCallbacks
def test_block_duplication_implicit(self):
"""
Test that a block which has already been published in the DHT (explicitly) will not be republished again;
i.e. no duplicate blocks in the DHT under different (embedded) versions.
"""
# Introduce the nodes
yield self.introduce_nodes(DHTCommunity)

# Publish the node to the DHT
hash_key = sha1(self.nodes[0].get_keys()['my_peer'].public_key.key_to_bin() +
DHTBlockEndpoint.KEY_SUFFIX).digest()

result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key)
self.assertEqual(result, [], "There shouldn't be any blocks for this key")

# Create a source block, and implicitly disseminate it
yield self.nodes[0].get_overlay_by_class(TrustChainCommunity).create_source_block(b'test', {})
yield self.deliver_messages()
yield self.sleep()

result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key)
self.assertNotEqual(result, [], "There should be at least one chunk for this key")

chunk_number = len(result)

# Force call the method which publishes the latest block to the DHT and check that it did not affect the DHT
self.nodes[0].get_overlay_by_class(TrustChainCommunity) \
.notify_listeners(TestBlock(TrustChainCommunity.UNIVERSAL_BLOCK_LISTENER))
yield self.deliver_messages()
yield self.sleep()

# Query the DHT again
result = self.nodes[0].get_overlay_by_class(DHTCommunity).storage.get(hash_key)
result = yield self.nodes[1].get_overlay_by_class(DHTCommunity).find_values(hash_key)
self.assertEqual(len(result), chunk_number, "The contents of the DHT have been changed. This should not happen")

0 comments on commit 2b998c6

Please sign in to comment.