Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-2236] Parallel Block importer #774

Merged
merged 20 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
bug fixes
* offer -> put
  * last stage is unbound so put won't stall the pipeline
* segment size = 200
* log level demotions
  • Loading branch information
shemnon committed Feb 6, 2019
commit 294197049351b0b5d154887554c166469e31d27d
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ protected void executeTaskWithPeer(final EthPeer peer) {
final Optional<O> output = processStep(input, previousInput, peer);
output.ifPresent(
o -> {
try {
outboundQueue.put(o);
} catch (final InterruptedException e) {
processingException.compareAndSet(null, e);
}
results.add(o);
outboundQueue.offer(o);
});
previousInput = Optional.of(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public static class Builder {
private int downloaderHeaderRequestSize = 10;
private int downloaderCheckpointTimeoutsPermitted = 5;
private int downloaderChainSegmentTimeoutsPermitted = 5;
private int downloaderChainSegmentSize = 20;
private int downloaderChainSegmentSize = 200;
private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
ethTasksTimer);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
// final PipelinedImportChainSegmentTask<C, Block> importTask =
// PipelinedImportChainSegmentTask.forCheckpoints(
final ParallelImportChainSegmentTask<C, Block> importTask =
ParallelImportChainSegmentTask.forCheckpoints(
protocolSchedule,
Expand All @@ -107,7 +105,6 @@ private CompletableFuture<List<Block>> importBlocksForCheckpoints(
() -> HeaderValidationMode.DETACHED_ONLY,
checkpointHeaders);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
// importedBlocks = importTask.run();
}
return importedBlocks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public static <C> DownloadHeaderSequenceTask<C> endingAtHeader(
protected CompletableFuture<List<BlockHeader>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
LOG.debug(
"Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber() - 1);
"Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber());
final CompletableFuture<List<BlockHeader>> task =
downloadHeaders(assignedPeer).thenCompose(this::processHeaders);
return task.whenComplete(
Expand All @@ -128,8 +128,8 @@ protected CompletableFuture<List<BlockHeader>> executePeerTask(
if (lastFilledHeaderIndex == 0) {
LOG.debug(
"Finished downloading headers from {} to {}.",
startingBlockNumber,
referenceHeader.getNumber() - 1);
headers[0].getNumber(),
headers[segmentLength - 1].getNumber());
result.get().complete(Arrays.asList(headers));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ protected Optional<List<B>> processStep(
final List<BlockHeader> headers,
final Optional<List<BlockHeader>> previousHeaders,
final EthPeer peer) {
LOG.debug(
LOG.trace(
"Downloading bodies {} to {}",
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
try {
final List<B> blocks = blockHandler.downloadBlocks(headers).get();
LOG.info(
LOG.debug(
"Downloaded bodies {} to {}",
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,10 @@ protected Optional<List<BlockHeader>> processStep(
}
headers.add(nextCheckpointHeader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't DownloadHeaderSequenceTask.endingAtHeader include nextCheckpointHeader as the last thing it retrieves? So would this wind up adding it twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (headers.size() > 2) {
LOG.trace("Received {} headers", headers.size() - 2);
LOG.info(
LOG.debug(
"Downloaded headers {} to {}",
headers.get(1).getNumber(),
headers.get(headers.size() - 2).getNumber());
headers.get(headers.size() - 1).getNumber());
}
return Optional.of(headers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected void executeTaskWithPeer(final EthPeer peer) {
new ParallelValidateAndImportBodiesTask<>(
blockHandler,
downloadBodiesTask.getOutboundQueue(),
maxActiveChunks,
Integer.MAX_VALUE,
ethContext,
ethTasksTimer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,17 @@ protected Optional<List<BlockHeader>> processStep(
parentHeader,
protocolContext,
validationPolicy.getValidationModeForNextBlock())) {
LOG.debug(
"Validated Headers {} to {}",
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
// The first header will be imported by the previous request range.
return Optional.of(headers.subList(1, headers.size()));
} else {
LOG.debug(
"Could not validate Headers {} to {}",
headers.get(0).getNumber(),
headers.get(headers.size() - 1).getNumber());
// ignore the value, we only want the first exception to be there
failExceptionally(
new InvalidBlockException(
Expand Down