Skip to content

Commit

Permalink
Message Forward
Browse files Browse the repository at this point in the history
  -Added capability for forwarding messages
  -Added forward with header/body optimizations
  -Added isForwarded status flag on Message
  -Unit Tests
  -Cleanup, method renaming for consistency
  -Written against latest vert.x EB API
  • Loading branch information
Weston M. Price committed Oct 21, 2014
1 parent 540b2c0 commit b4defc9
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ test-results
test-tmp
*.class
ScratchPad.java
*.swp
9 changes: 9 additions & 0 deletions vertx-core/src/main/java/io/vertx/core/eventbus/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,13 @@ public interface Message<T> {
void forward(String address);

void forward(String address, DeliveryOptions options);

/**
* Indicates whether or not this message has been received as a result of a forward operation
* versus a send or publish.
*
* @return whether or not the message has been fowarded
*/
boolean isForward();

}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ public EventBus publish(String address, Object message, DeliveryOptions options)
return this;
}

EventBus forward(String address, Object message) {
return forward(address, message, null);
}

EventBus forward(String address, Object message, DeliveryOptions options) {
sendOrPub(null, (MessageImpl)message, options, null);
return this;
}
@Override
public <T> MessageConsumer<T> consumer(String address) {
return new HandlerRegistration<>(address, false, false, -1);
Expand Down Expand Up @@ -709,7 +717,7 @@ private void schedulePing(ConnectionHolder holder) {
cleanupConnection(holder.theServerID, holder, true);
});
MessageImpl pingMessage = new MessageImpl<>(serverID, PING_ADDRESS, null, null, null, new NullMessageCodec(), true);
holder.socket.write(pingMessage.encodeToWire());
holder.socket.write(pingMessage.writeToWire());
});
}

Expand Down Expand Up @@ -846,11 +854,11 @@ private ConnectionHolder(NetClient client) {

void writeMessage(MessageImpl message) {
if (connected) {
socket.write(message.encodeToWire());
socket.write(message.writeToWire());
} else {
synchronized (this) {
if (connected) {
socket.write(message.encodeToWire());
socket.write(message.writeToWire());
} else {
pending.add(message);
}
Expand All @@ -872,7 +880,7 @@ synchronized void connected(ServerID theServerID, NetSocket socket) {
// Start a pinger
schedulePing(ConnectionHolder.this);
for (MessageImpl message : pending) {
socket.write(message.encodeToWire());
socket.write(message.writeToWire());
}
pending.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class MessageImpl<U, V> implements Message<V> {
private Buffer wireBuffer;
private int bodyPos;
private int headersPos;
private int absoluteBodyPos;
private int absoluteHeaderPos;
private boolean isForwarded;

public MessageImpl() {
}
Expand Down Expand Up @@ -91,8 +94,12 @@ private MessageImpl(MessageImpl<U, V> other) {
this.wireBuffer = other.wireBuffer;
this.bodyPos = other.bodyPos;
this.headersPos = other.headersPos;
this.headersPos = other.headersPos;
this.absoluteBodyPos = other.absoluteBodyPos;
this.absoluteHeaderPos = other.absoluteHeaderPos;
}
this.send = other.send;
this.isForwarded = other.isForwarded;
}

public MessageImpl<U, V> copyBeforeReceive() {
Expand All @@ -110,7 +117,7 @@ public MultiMap headers() {
if (headers == null) {
// The message has been read from the wire
if (headersPos != 0) {
decodeHeaders();
readHeaders();
}
if (headers == null) {
headers = new CaseInsensitiveHeaders();
Expand All @@ -124,7 +131,7 @@ public V body() {
// Lazily decode the body
if (receivedBody == null && bodyPos != 0) {
// The message has been read from the wire
decodeBody();
readBody();
}
return receivedBody;
}
Expand All @@ -134,8 +141,8 @@ public String replyAddress() {
return replyAddress;
}

public Buffer encodeToWire() {
int length = 1024; // TODO make this configurable
public Buffer writeToWire() {
int length = 1024;
Buffer buffer = Buffer.buffer(length);
buffer.appendInt(0);
buffer.appendByte(WIRE_PROTOCOL_VERSION);
Expand All @@ -154,12 +161,10 @@ public Buffer encodeToWire() {
}
buffer.appendInt(sender.port);
writeString(buffer, sender.host);
encodeHeaders(buffer);
buffer.appendByte(isForwarded ? (byte)0 : (byte)1);
writeHeaders(buffer);
writeBody(buffer);
buffer.setInt(0, buffer.length() - 4);
// if (buffer.length()> length) {
// log.warn("Overshot length " + length + " actual " + buffer.length());
// }
return buffer;
}

Expand Down Expand Up @@ -210,20 +215,25 @@ public void readFromWire(Buffer buffer, Map<String, MessageCodec> codecMap, Mess
bytes = buffer.getBytes(pos, pos + length);
String senderHost = new String(bytes, CharsetUtil.UTF_8);
pos += length;
headersPos = pos;

isForwarded = (buffer.getByte(pos) == 0);
pos++;

headersPos = absoluteHeaderPos = pos;
int headersLength = buffer.getInt(pos);
pos += headersLength;
bodyPos = pos;
bodyPos = absoluteBodyPos = pos;
sender = new ServerID(senderPort, senderHost);
wireBuffer = buffer;
}

private void decodeBody() {
private void readBody() {
receivedBody = messageCodec.decodeFromWire(bodyPos, wireBuffer);
bodyPos = 0;
}

private void encodeHeaders(Buffer buffer) {
private void writeHeaders(Buffer buffer) {
//Headers exist, have been decoded from the wire, and need to be reencoded
if (headers != null && !headers.isEmpty()) {
int headersLengthPos = buffer.getByteBuf().writerIndex();
buffer.appendInt(0);
Expand All @@ -235,12 +245,20 @@ private void encodeHeaders(Buffer buffer) {
}
int headersEndPos = buffer.getByteBuf().writerIndex();
buffer.setInt(headersLengthPos, headersEndPos - headersLengthPos);
} else {
}else if((headers == null && headersPos != 0) || (headers != null && headersPos == 0)){
//headers could exist, just have never been read
int length = wireBuffer.getInt(headersPos);
buffer.appendInt(length);
int numHeaders = wireBuffer.getInt(headersPos + 4);
buffer.appendInt(numHeaders);
byte[] rawHeaders = wireBuffer.getBytes(headersPos + 8, (headersPos) + length);
buffer.appendBytes(rawHeaders);
}else {
buffer.appendInt(4);
}
}

private void decodeHeaders() {
private void readHeaders() {
int length = wireBuffer.getInt(headersPos);
if (length != 0) {
headersPos += 4;
Expand All @@ -265,7 +283,14 @@ private void decodeHeaders() {
}

private void writeBody(Buffer buff) {
messageCodec.encodeToWire(buff, sentBody);
if(sentBody != null){
messageCodec.encodeToWire(buff, sentBody);
}else if((receivedBody == null && bodyPos != 0) || (receivedBody != null && bodyPos == 0)){
int length = wireBuffer.getInt(absoluteBodyPos);
byte[] bytes = wireBuffer.getBytes(absoluteBodyPos + 4, absoluteBodyPos + 4 + length);
buff.appendInt(bytes.length);
buff.appendBytes(bytes);
}
}

private void writeString(Buffer buff, String str) {
Expand All @@ -282,12 +307,14 @@ public void fail(int failureCode, String message) {

@Override
public void forward(String address) {

forward(address, null);
}

@Override
public void forward(String address, DeliveryOptions options) {

this.address = address;
this.isForwarded = true;
bus.forward(address, this, options);
}

@Override
Expand Down Expand Up @@ -332,5 +359,10 @@ private <R> void sendReply(MessageImpl msg, DeliveryOptions options, Handler<Asy
}
}

@Override
public boolean isForward() {
return this.isForwarded;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,60 @@ public void handle(AsyncResult<Void> ar) {
vertices[0].eventBus().publish(ADDRESS1, val);
await();
}

@Override
protected <T> void testForward(T val) {
startNodes(2);

vertices[0].eventBus().<T>consumer(ADDRESS1).handler((Message<T> msg) -> {
assertEquals(val, msg.body());
msg.forward(ADDRESS2);
});

vertices[1].eventBus().<T>consumer(ADDRESS2).handler((Message<T> msg) -> {
assertEquals(val, msg.body());
assertTrue(msg.isForward());
testComplete();
});

vertices[0].eventBus().send(ADDRESS1, val);
await();

}

@Override
protected <T> void testForward(T val, DeliveryOptions options) {

startNodes(2);
int expectedHeaders = options.getHeaders().size();
final String FIRST_KEY = "first";
final String SEC_KEY = "second";

vertices[0].eventBus().<T>consumer(ADDRESS1).handler((Message<T> msg) -> {
assertEquals(val, msg.body());
if(!msg.isForward()){
msg.forward(ADDRESS2);
}else{
assertTrue(msg.isForward());
assertTrue(msg.headers().size() == expectedHeaders);
assertEquals(msg.headers().get(FIRST_KEY), "first");
assertEquals(msg.headers().get(SEC_KEY), "second");
testComplete();
}

});

vertices[1].eventBus().<T>consumer(ADDRESS2).handler((Message<T> msg) -> {
assertEquals(val, msg.body());
assertTrue(msg.isForward());
msg.forward(ADDRESS1);
});

vertices[0].eventBus().send(ADDRESS1, val, options);
await();

}


@Test
public void testLocalHandlerNotReceive() throws Exception {
Expand Down
17 changes: 17 additions & 0 deletions vertx-core/src/test/java/io/vertx/test/core/EventBusTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,23 @@ public void testReplyWithHeaders() {
testReply("foo", "foo", null, new DeliveryOptions().addHeader("uhqwduh", "qijwdqiuwd").addHeader("iojdijef", "iqjwddh"));
}

@Test
public void testForward(){
testForward(TestUtils.randomUnicodeString(100));
}

@Test
public void testForwardWithHeaders(){
DeliveryOptions options = new DeliveryOptions();
options.addHeader("first", "first");
options.addHeader("second", "second");
testForward("Test body", options);
}

protected abstract <T> void testForward(T val);

protected abstract <T> void testForward(T val, DeliveryOptions options);

protected <T> void testSend(T val) {
testSend(val, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1201,5 +1201,42 @@ public void testPump() {
Pump.pump(consumer, producer);
producer.write(str);
}
}

@Override
protected <T> void testForward(T val) {

eb.<T>consumer(ADDRESS1).handler((Message<T> msg) -> {
assertEquals(val, msg.body());
msg.forward(ADDRESS2);

});

eb.<T>consumer(ADDRESS2).handler((Message<T> msg) -> {

assertEquals(val, msg.body());
testComplete();
});

eb.send(ADDRESS1, val);
await();
}

@Override
protected <T> void testForward(T val, DeliveryOptions options) {

eb.<T>consumer(ADDRESS1).handler((Message<T> msg) -> {
assertEquals(val, msg.body());
msg.forward(ADDRESS2);
});

eb.<T>consumer(ADDRESS2).handler((Message<T> msg) -> {

assertEquals(val, msg.body());
testComplete();
});

eb.send(ADDRESS1, val, options);
await();

}
}

0 comments on commit b4defc9

Please sign in to comment.