Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Implement max message size rather than limiting with fixed number of …
Browse files Browse the repository at this point in the history
…transactions (#1271)

* Implement max message size rather then cap with fixed number of transactions

Adding transactions to the RLP until the message size exceeds the limit and then send that.

* fix final variables

* Update AbstractRLPOutput.java

add javadoc

* pr discussion

put this factory method on LimitedTransactionsMessages rather than TransactionsMessage since it returns a LimitedTransactionsMessages.

* SpotlessApply

* fix PR discussion

- simplify design
- remove useless code

* Update LimitedTransactionsMessages.java

* fix PR discussion

- simplify logic
- add tests

* Update AbstractRLPOutput.java

* Update ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/messages/LimitedTransactionsMessages.java

Co-Authored-By: abdelhamidbakhta <[email protected]>

* Update Transaction.java

* fix PR discussion

* fix PR discussion

- add tests

* Update BlockDataGenerator.java

* Update LimitedTransactionsMessagesTest.java

fix PR unit test

* Update LimitedTransactionsMessagesTest.java

* Update LimitedTransactionsMessagesTest.java

Use LinkedHashSet to preserve order.

* Update LimitedTransactionsMessagesTest.java
  • Loading branch information
AbdelStark committed Apr 17, 2019
1 parent d14f1ec commit 7cf96ac
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.core;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.stream.Collectors.toSet;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive;

import tech.pegasys.pantheon.crypto.SECP256K1;
Expand All @@ -31,6 +32,8 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.stream.IntStream;

public class BlockDataGenerator {
private final Random random;
Expand Down Expand Up @@ -241,6 +244,18 @@ public BlockBody body(final BlockOptions options) {
return new BlockBody(options.getTransactions(defaultTxs), ommers);
}

public Transaction transaction(final BytesValue payload) {
return Transaction.builder()
.nonce(positiveLong())
.gasPrice(Wei.wrap(bytes32()))
.gasLimit(positiveLong())
.to(address())
.value(Wei.wrap(bytes32()))
.payload(payload)
.chainId(1)
.signAndBuild(SECP256K1.KeyPair.generate());
}

public Transaction transaction() {
return Transaction.builder()
.nonce(positiveLong())
Expand All @@ -253,6 +268,34 @@ public Transaction transaction() {
.signAndBuild(SECP256K1.KeyPair.generate());
}

public Set<Transaction> transactions(final int n) {
Wei gasPrice = Wei.wrap(bytes32());
long gasLimit = positiveLong();
Address to = address();
Wei value = Wei.wrap(bytes32());
int chainId = 1;
Bytes32 payload = bytes32();
SECP256K1.Signature signature = SECP256K1.sign(payload, SECP256K1.KeyPair.generate());

final Set<Transaction> txs =
IntStream.range(0, n)
.parallel()
.mapToObj(
v ->
new Transaction(
v,
gasPrice,
gasLimit,
Optional.of(to),
value,
signature,
payload,
to,
chainId))
.collect(toSet());
return txs;
}

public TransactionReceipt receipt(final long cumulativeGasUsed) {
return new TransactionReceipt(hash(), cumulativeGasUsed, Arrays.asList(log(), log()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.messages;

import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.HashSet;
import java.util.Set;

public final class LimitedTransactionsMessages {

private static final int LIMIT = 1048576;

private final TransactionsMessage transactionsMessage;
private final Set<Transaction> includedTransactions;

public LimitedTransactionsMessages(
final TransactionsMessage transactionsMessage, final Set<Transaction> includedTransactions) {
this.transactionsMessage = transactionsMessage;
this.includedTransactions = includedTransactions;
}

public static LimitedTransactionsMessages createLimited(
final Iterable<Transaction> transactions) {
final Set<Transaction> includedTransactions = new HashSet<>();
final BytesValueRLPOutput message = new BytesValueRLPOutput();
int messageSize = 0;
message.startList();
for (final Transaction transaction : transactions) {
final BytesValueRLPOutput encodedTransaction = new BytesValueRLPOutput();
transaction.writeTo(encodedTransaction);
BytesValue encodedBytes = encodedTransaction.encoded();
// Break if individual transaction size exceeds limit
if (encodedBytes.size() > LIMIT && (messageSize != 0)) {
break;
}
message.writeRLPUnsafe(encodedBytes);
includedTransactions.add(transaction);
// Check if last transaction to add to the message
messageSize += encodedBytes.size();
if (messageSize > LIMIT) {
break;
}
}
message.endList();
return new LimitedTransactionsMessages(
new TransactionsMessage(message.encoded()), includedTransactions);
}

public final TransactionsMessage getTransactionsMessage() {
return transactionsMessage;
}

public final Set<Transaction> getIncludedTransactions() {
return includedTransactions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static TransactionsMessage create(final Iterable<Transaction> transaction
return new TransactionsMessage(tmp.encoded());
}

private TransactionsMessage(final BytesValue data) {
TransactionsMessage(final BytesValue data) {
super(data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,36 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static java.util.stream.Collectors.toSet;

import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.LimitedTransactionsMessages;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;

import java.util.Set;
import java.util.stream.StreamSupport;

class TransactionsMessageSender {

private static final int MAX_BATCH_SIZE = 10;
private final PeerTransactionTracker transactionTracker;

public TransactionsMessageSender(final PeerTransactionTracker transactionTracker) {
this.transactionTracker = transactionTracker;
}

public void sendTransactionsToPeers() {
transactionTracker.getEthPeersWithUnsentTransactions().forEach(this::sendTransactionsToPeer);
StreamSupport.stream(transactionTracker.getEthPeersWithUnsentTransactions().spliterator(), true)
.parallel()
.forEach(this::sendTransactionsToPeer);
}

private void sendTransactionsToPeer(final EthPeer peer) {
final Set<Transaction> allTxToSend = transactionTracker.claimTransactionsToSendToPeer(peer);
while (!allTxToSend.isEmpty()) {
final Set<Transaction> subsetToSend =
allTxToSend.stream().limit(MAX_BATCH_SIZE).collect(toSet());
allTxToSend.removeAll(subsetToSend);
final LimitedTransactionsMessages limitedTransactionsMessages =
LimitedTransactionsMessages.createLimited(allTxToSend);
allTxToSend.removeAll(limitedTransactionsMessages.getIncludedTransactions());
try {
peer.send(TransactionsMessage.create(subsetToSend));
peer.send(limitedTransactionsMessages.getTransactionsMessage());
} catch (final PeerNotConnected e) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.messages;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;

import org.junit.Test;

public class LimitedTransactionsMessagesTest {

private static final int LIMIT = 1048576;

private final BlockDataGenerator generator = new BlockDataGenerator();
private final Set<Transaction> sampleTxs = generator.transactions(1);
private final TransactionsMessage sampleTransactionMessages =
TransactionsMessage.create(sampleTxs);
private final LimitedTransactionsMessages sampleLimitedTransactionsMessages =
new LimitedTransactionsMessages(sampleTransactionMessages, sampleTxs);

@Test
public void createLimited() {
final Set<Transaction> txs = generator.transactions(6000);
final LimitedTransactionsMessages firstMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(5219, firstMessage.getIncludedTransactions().size());
txs.removeAll(firstMessage.getIncludedTransactions());
assertEquals(781, txs.size());
final LimitedTransactionsMessages secondMessage =
LimitedTransactionsMessages.createLimited(txs);
assertEquals(781, secondMessage.getIncludedTransactions().size());
txs.removeAll(secondMessage.getIncludedTransactions());
assertEquals(0, txs.size());
assertTrue(
(firstMessage.getTransactionsMessage().getSize()
+ secondMessage.getTransactionsMessage().getSize())
< 2 * LIMIT);
}

@Test
public void createLimitedWithFirstTransactionExceedingLimit() {
final Set<Transaction> txs = new HashSet<>();
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
final LimitedTransactionsMessages firstMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(2, firstMessage.getIncludedTransactions().size());
txs.removeAll(firstMessage.getIncludedTransactions());
assertEquals(1, txs.size());
final LimitedTransactionsMessages secondMessage =
LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, secondMessage.getIncludedTransactions().size());
txs.removeAll(secondMessage.getIncludedTransactions());
assertEquals(0, txs.size());
}

@Test
public void createLimitedWithFirstTransactionExceedingLimit_2() {
final Set<Transaction> txs = new LinkedHashSet<>();

txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT + 100 - 180])));
txs.add(generator.transaction(BytesValue.wrap(new byte[LIMIT - 180])));
final LimitedTransactionsMessages firstMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, firstMessage.getIncludedTransactions().size());
txs.removeAll(firstMessage.getIncludedTransactions());
assertEquals(2, txs.size());
final LimitedTransactionsMessages secondMessage =
LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, secondMessage.getIncludedTransactions().size());
txs.removeAll(secondMessage.getIncludedTransactions());
assertEquals(1, txs.size());
final LimitedTransactionsMessages thirdMessage = LimitedTransactionsMessages.createLimited(txs);
assertEquals(1, thirdMessage.getIncludedTransactions().size());
txs.removeAll(thirdMessage.getIncludedTransactions());
assertEquals(0, txs.size());
}

@Test
public void getTransactionsMessage() {
assertEquals(
sampleTransactionMessages, sampleLimitedTransactionsMessages.getTransactionsMessage());
}

@Test
public void getIncludedTransactions() {
assertEquals(sampleTxs, sampleLimitedTransactionsMessages.getIncludedTransactions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static com.google.common.collect.Sets.newHashSet;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
Expand All @@ -30,7 +29,6 @@

import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;

import com.google.common.collect.Sets;
import org.junit.Test;
Expand All @@ -40,6 +38,7 @@ public class TransactionsMessageSenderTest {

private final EthPeer peer1 = mock(EthPeer.class);
private final EthPeer peer2 = mock(EthPeer.class);

private final BlockDataGenerator generator = new BlockDataGenerator();
private final Transaction transaction1 = generator.transaction();
private final Transaction transaction2 = generator.transaction();
Expand All @@ -63,14 +62,12 @@ public void shouldSendTransactionsToEachPeer() throws Exception {
}

@Test
public void shouldSendTransactionsInBatches() throws Exception {
final Set<Transaction> fifteenTransactions =
IntStream.range(0, 15).mapToObj(number -> generator.transaction()).collect(toSet());
fifteenTransactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));
public void shouldSendTransactionsInBatchesWithLimit() throws Exception {
final Set<Transaction> transactions = generator.transactions(6000);

messageSender.sendTransactionsToPeers();
transactions.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));

messageSender.sendTransactionsToPeers();
final ArgumentCaptor<MessageData> messageDataArgumentCaptor =
ArgumentCaptor.forClass(MessageData.class);
verify(peer1, times(2)).send(messageDataArgumentCaptor.capture());
Expand All @@ -82,10 +79,10 @@ public void shouldSendTransactionsInBatches() throws Exception {
final Set<Transaction> firstBatch = getTransactionsFromMessage(sentMessages.get(0));
final Set<Transaction> secondBatch = getTransactionsFromMessage(sentMessages.get(1));

assertThat(firstBatch).hasSize(10);
assertThat(secondBatch).hasSize(5);
assertThat(firstBatch).hasSize(5219);
assertThat(secondBatch).hasSize(781);

assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(fifteenTransactions);
assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(transactions);
}

private MessageData transactionsMessageContaining(final Transaction... transactions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ public int encodedSize() {
checkState(stackSize == 1, "A list has been entered (startList()) but not left (endList())");
return payloadSizes[0];
}

/**
* Write the rlp encoded value to the provided {@link MutableBytesValue}
*
Expand Down

0 comments on commit 7cf96ac

Please sign in to comment.