Skip to content

Commit

Permalink
[FLINK-16641][network] (Part#1) Introduce a new network message Backl…
Browse files Browse the repository at this point in the history
…ogAnnouncement which can bring the upstream buffer backlog to the downstream
  • Loading branch information
wsry authored and pnowojski committed Jul 12, 2021
1 parent 1746146 commit 3a46604
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,20 @@ private void decodeMsg(Object msg) throws Throwable {
}
}
}
} else if (msgClazz == NettyMessage.BacklogAnnouncement.class) {
NettyMessage.BacklogAnnouncement announcement = (NettyMessage.BacklogAnnouncement) msg;

RemoteInputChannel inputChannel = inputChannels.get(announcement.receiverId);
if (inputChannel == null || inputChannel.isReleased()) {
cancelRequestFor(announcement.receiverId);
return;
}

try {
inputChannel.onSenderBacklog(announcement.backlog);
} catch (Throwable throwable) {
inputChannel.onError(throwable);
}
} else {
throw new IllegalStateException(
"Received unknown message from producer: " + msg.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,54 @@ public String toString() {
}
}

/** Backlog announcement from the producer to the consumer for credit allocation. */
static class BacklogAnnouncement extends NettyMessage {

static final byte ID = 9;

final int backlog;

final InputChannelID receiverId;

BacklogAnnouncement(int backlog, InputChannelID receiverId) {
checkArgument(backlog > 0, "Must be positive.");
checkArgument(receiverId != null, "Must be not null.");

this.backlog = backlog;
this.receiverId = receiverId;
}

@Override
void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator)
throws IOException {
ByteBuf result = null;

try {
result =
allocateBuffer(
allocator, ID, Integer.BYTES + InputChannelID.getByteBufLength());
result.writeInt(backlog);
receiverId.writeTo(result);

out.write(result, promise);
} catch (Throwable t) {
handleException(result, null, t);
}
}

static BacklogAnnouncement readFrom(ByteBuf buffer) {
int backlog = buffer.readInt();
InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);

return new BacklogAnnouncement(backlog, receiverId);
}

@Override
public String toString() {
return String.format("BacklogAnnouncement(%d : %s)", backlog, receiverId);
}
}

// ------------------------------------------------------------------------

void writeToChannel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.net.ProtocolException;

import static org.apache.flink.runtime.io.network.netty.NettyMessage.BacklogAnnouncement;
import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;

Expand Down Expand Up @@ -59,6 +60,8 @@ public DecodingResult onChannelRead(ByteBuf data) throws Exception {
switch (msgId) {
case ErrorResponse.ID:
return DecodingResult.fullMessage(ErrorResponse.readFrom(fullFrameHeaderBuf));
case BacklogAnnouncement.ID:
return DecodingResult.fullMessage(BacklogAnnouncement.readFrom(fullFrameHeaderBuf));
default:
throw new ProtocolException("Received unknown message from producer: " + msgId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public Buffer requestBuffer() {
*
* @param backlog The number of unsent buffers in the producer's sub partition.
*/
void onSenderBacklog(int backlog) throws IOException {
public void onSenderBacklog(int backlog) throws IOException {
notifyBufferAvailable(bufferManager.requestFloatingBuffers(backlog + initialCredit));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,51 @@ public void testReceiveCompressedBuffer() throws Exception {
}
}

/** Verifies that {@link NettyMessage.BacklogAnnouncement} can be handled correctly. */
@Test
public void testReceiveBacklogAnnouncement() throws Exception {
int bufferSize = 1024;
int numBuffers = 10;
NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, bufferSize);
SingleInputGate inputGate =
new SingleInputGateBuilder().setSegmentProvider(networkBufferPool).build();
RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, null);
inputGate.setInputChannels(inputChannel);

try {
BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
inputGate.setBufferPool(bufferPool);
inputGate.setupChannels();

CreditBasedPartitionRequestClientHandler handler =
new CreditBasedPartitionRequestClientHandler();
handler.addInputChannel(inputChannel);

assertEquals(2, inputChannel.getNumberOfAvailableBuffers());
assertEquals(0, inputChannel.unsynchronizedGetFloatingBuffersAvailable());

int backlog = 5;
NettyMessage.BacklogAnnouncement announcement =
new NettyMessage.BacklogAnnouncement(backlog, inputChannel.getInputChannelId());
handler.channelRead(null, announcement);
assertEquals(7, inputChannel.getNumberOfAvailableBuffers());
assertEquals(7, inputChannel.getNumberOfRequiredBuffers());
assertEquals(backlog, inputChannel.getSenderBacklog());
assertEquals(5, inputChannel.unsynchronizedGetFloatingBuffersAvailable());

backlog = 12;
announcement =
new NettyMessage.BacklogAnnouncement(backlog, inputChannel.getInputChannelId());
handler.channelRead(null, announcement);
assertEquals(10, inputChannel.getNumberOfAvailableBuffers());
assertEquals(14, inputChannel.getNumberOfRequiredBuffers());
assertEquals(backlog, inputChannel.getSenderBacklog());
assertEquals(8, inputChannel.unsynchronizedGetFloatingBuffersAvailable());
} finally {
releaseResource(inputGate, networkBufferPool);
}
}

/**
* Verifies that {@link RemoteInputChannel#onError(Throwable)} is called when a {@link
* BufferResponse} is received but no available buffer in input channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.util.Random;

import static org.apache.flink.runtime.io.network.netty.NettyMessage.BacklogAnnouncement;
import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder;
Expand Down Expand Up @@ -147,6 +148,14 @@ public void testCompressedBufferResponse() {
testBufferResponse(false, true);
}

@Test
public void testBacklogAnnouncement() {
BacklogAnnouncement expected = new BacklogAnnouncement(1024, inputChannelId);
BacklogAnnouncement actual = encodeAndDecode(expected, channel);
assertEquals(expected.backlog, actual.backlog);
assertEquals(expected.receiverId, actual.receiverId);
}

private void testErrorResponse(ErrorResponse expect) {
ErrorResponse actual = encodeAndDecode(expect, channel);
verifyErrorResponse(expect, actual);
Expand Down

0 comments on commit 3a46604

Please sign in to comment.