Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add totalDiffculty to BlockPropagated events. #97

Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.plugin.data.BlockHeader;
import org.hyperledger.besu.plugin.data.BlockPropagated;
import org.hyperledger.besu.plugin.data.Quantity;
import org.hyperledger.besu.plugin.services.BesuEvents;

import java.util.function.Supplier;

public class BesuEventsImpl implements BesuEvents {
private final BlockBroadcaster blockBroadcaster;
private final TransactionPool transactionPool;
Expand All @@ -36,7 +41,8 @@ public BesuEventsImpl(
@Override
public long addBlockPropagatedListener(final BlockPropagatedListener listener) {
return blockBroadcaster.subscribePropagateNewBlocks(
block -> listener.onBlockPropagated(block.getHeader()));
(block, totalDifficulty) ->
listener.onBlockPropagated(blockPropagated(block::getHeader, () -> totalDifficulty)));
}

@Override
Expand Down Expand Up @@ -75,4 +81,20 @@ public long addSyncStatusListener(final SyncStatusListener syncStatusListener) {
public void removeSyncStatusListener(final long listenerIdentifier) {
syncState.removeSyncStatusListener(listenerIdentifier);
}

private static BlockPropagated blockPropagated(
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
final Supplier<BlockHeader> blockHeaderSupplier,
final Supplier<Quantity> totalDifficultySupplier) {
return new BlockPropagated() {
@Override
public BlockHeader getBlockHeader() {
return blockHeaderSupplier.get();
}

@Override
public Quantity getTotalDifficulty() {
return totalDifficultySupplier.get();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.plugin.BesuContext;
import org.hyperledger.besu.plugin.BesuPlugin;
import org.hyperledger.besu.plugin.data.BlockHeader;
import org.hyperledger.besu.plugin.data.BlockPropagated;
import org.hyperledger.besu.plugin.services.BesuEvents;

import java.io.File;
Expand Down Expand Up @@ -66,7 +67,8 @@ public void stop() {
LOG.info("No longer listening with ID#" + subscriptionId);
}

private void onBlockAnnounce(final BlockHeader header) {
private void onBlockAnnounce(final BlockPropagated blockPropagated) {
final BlockHeader header = blockPropagated.getBlockHeader();
final int blockCount = blockCounter.incrementAndGet();
LOG.info("I got a new block! (I've seen {}) - {}", blockCount, header);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.hyperledger.besu.ethereum.mainnet.ValidationResult;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.data.BlockHeader;
import org.hyperledger.besu.plugin.data.BlockPropagated;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.data.Transaction;
import org.hyperledger.besu.testutil.TestClock;
Expand Down Expand Up @@ -144,24 +144,29 @@ public void syncStatusEventDoesNotFireAfterUnsubscribe() {

@Test
public void newBlockEventFiresAfterSubscribe() {
final AtomicReference<BlockHeader> result = new AtomicReference<>();
final AtomicReference<BlockPropagated> result = new AtomicReference<>();
serviceImpl.addBlockPropagatedListener(result::set);

final Block block = generateBlock();
assertThat(result.get()).isNull();
blockBroadcaster.propagate(generateBlock(), UInt256.of(1));
blockBroadcaster.propagate(block, UInt256.of(1));

assertThat(result.get()).isNotNull();
assertThat(result.get().getBlockHeader()).isEqualTo(block.getHeader());
assertThat(result.get().getTotalDifficulty()).isEqualTo(UInt256.of(1));
}

@Test
public void newBlockEventDoesNotFireAfterUnsubscribe() {
final AtomicReference<BlockHeader> result = new AtomicReference<>();
final AtomicReference<BlockPropagated> result = new AtomicReference<>();
final long id = serviceImpl.addBlockPropagatedListener(result::set);

assertThat(result.get()).isNull();
blockBroadcaster.propagate(generateBlock(), UInt256.of(1));
final Block block = generateBlock();
blockBroadcaster.propagate(block, UInt256.of(2));

assertThat(result.get()).isNotNull();
assertThat(result.get().getBlockHeader()).isEqualTo(block.getHeader());
assertThat(result.get().getTotalDifficulty()).isEqualTo(UInt256.of(2));
serviceImpl.removeBlockPropagatedListener(id);
result.set(null);

Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ task checkMavenCoordianteCollisions {
def coordinate = it.publishing?.publications[0].coordinates
if (coordinates.containsKey(coordinate)) {
throw new GradleException("Duplicate maven coordinates detected, ${coordinate} is used by " +
"both ${coordinates[coordinate]} and ${it.path}.\n" +
"Please add a `publishing` script block to one or both subprojects.")
"both ${coordinates[coordinate]} and ${it.path}.\n" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix the task name at the same time?
checkMavenCoordianteCollisions should be checkMavenCoordinateCollisions

"Please add a `publishing` script block to one or both subprojects.")
}
coordinates[coordinate] = it.path
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@
import org.hyperledger.besu.util.Subscribers;
import org.hyperledger.besu.util.uint.UInt256;

import java.util.function.Consumer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class BlockBroadcaster {
private static final Logger LOG = LogManager.getLogger();

private final EthContext ethContext;
private final Subscribers<Consumer<Block>> blockPropagatedSubscribers = Subscribers.create();
private final Subscribers<BlockPropagatedSubscriber> blockPropagatedSubscribers =
Subscribers.create();

public BlockBroadcaster(final EthContext ethContext) {
this.ethContext = ethContext;
}

public long subscribePropagateNewBlocks(final Consumer<Block> callback) {
public long subscribePropagateNewBlocks(final BlockPropagatedSubscriber callback) {
return blockPropagatedSubscribers.subscribe(callback);
}

Expand All @@ -45,7 +44,7 @@ public void unsubscribePropagateNewBlocks(final long id) {
}

public void propagate(final Block block, final UInt256 totalDifficulty) {
blockPropagatedSubscribers.forEach(listener -> listener.accept(block));
blockPropagatedSubscribers.forEach(listener -> listener.accept(block, totalDifficulty));
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
ethContext
.getEthPeers()
Expand All @@ -61,4 +60,9 @@ public void propagate(final Block block, final UInt256 totalDifficulty) {
}
});
}

@FunctionalInterface
public interface BlockPropagatedSubscriber {
void accept(Block block, UInt256 totalDifficulty);
}
}
2 changes: 1 addition & 1 deletion plugin-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Calculated : ${currentHash}
tasks.register('checkAPIChanges', FileStateChecker) {
description = "Checks that the API for the Plugin-API project does not change without deliberate thought"
files = sourceSets.main.allJava.files
knownHash = 'b8BCiNvy9vSYsTtS1NfszsVT0EMV8tiNc7igzXzTrak='
knownHash = 'ajwXqGDmfm3B7QKFEXIPHdDi8tUrv7enfJO7fHQTFbA='
}
check.dependsOn('checkAPIChanges')

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.plugin.data;

import org.hyperledger.besu.plugin.Unstable;

/** The minimum set of data for a BlockPropagated. */
@Unstable
public interface BlockPropagated {

/**
* A {@link BlockHeader} object.
*
* @return A {@link BlockHeader}
*/
BlockHeader getBlockHeader();

/**
* A scalar value corresponding to the total difficulty.
*
* @return A scalar value corresponding to the total difficulty.
*/
Quantity getTotalDifficulty();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package org.hyperledger.besu.plugin.services;

import org.hyperledger.besu.plugin.Unstable;
import org.hyperledger.besu.plugin.data.BlockHeader;
import org.hyperledger.besu.plugin.data.BlockPropagated;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.data.Transaction;

Expand Down Expand Up @@ -109,9 +109,9 @@ interface BlockPropagatedListener {
* <p>The block may not have been imported to the local chain yet and may fail later
* validations.
*
* @param blockHeader the new block header.
* @param blockPropagated block being propagated.
*/
void onBlockPropagated(BlockHeader blockHeader);
void onBlockPropagated(BlockPropagated blockPropagated);
}

/** The listener interface for receiving new transaction added events. */
Expand Down