Skip to content

Commit

Permalink
[FLINK-7701][network] really fix watermark configuration order this time
Browse files Browse the repository at this point in the history
FLINK-7258 fixed this for large memory segment sizes but broke it for small
ones. This should fix both.

FYI: Newer Netty versions actually circumvent the problem by allowing to set
both watermarks at the same time.

This closes apache#4733.
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Nov 6, 2017
1 parent fbc1263 commit 88737cf
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,19 @@ private boolean attemptToBind(final int port) throws Throwable {
.channel(NioServerSocketChannel.class)
.option(ChannelOption.ALLOCATOR, bufferPool)
.childOption(ChannelOption.ALLOCATOR, bufferPool)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
.childHandler(new ServerChannelInitializer<>(handler));

final int defaultHighWaterMark = 64 * 1024; // from DefaultChannelConfig (not exposed)
//noinspection ConstantConditions
// (ignore warning here to make this flexible in case the configuration values change)
if (LOW_WATER_MARK > defaultHighWaterMark) {
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
} else { // including (newHighWaterMark < defaultLowWaterMark)
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
}

try {
final ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,18 @@ void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws
}

// Low and high water marks for flow control
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
// hack around the impossibility (in the current netty version) to set both watermarks at
// the same time:
final int defaultHighWaterMark = 64 * 1024; // from DefaultChannelConfig (not exposed)
final int newLowWaterMark = config.getMemorySegmentSize() + 1;
final int newHighWaterMark = 2 * config.getMemorySegmentSize();
if (newLowWaterMark > defaultHighWaterMark) {
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, newHighWaterMark);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, newLowWaterMark);
} else { // including (newHighWaterMark < defaultLowWaterMark)
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, newLowWaterMark);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, newHighWaterMark);
}

// SSL related configuration
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,54 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Verifies that high and low watermarks for {@link NettyServer} may be set to any (valid) values
* given by the user.
*/
public class NettyServerLowAndHighWatermarkTest {

/**
* Pick a larger memory segment size here in order to trigger
* <a href="https://issues.apache.org/jira/browse/FLINK-7258">FLINK-7258</a>.
* Verify low and high watermarks being set correctly for larger memory segment sizes which
* trigger <a href="https://issues.apache.org/jira/browse/FLINK-7258">FLINK-7258</a>.
*/
@Test
public void testLargeLowAndHighWatermarks() throws Throwable {
testLowAndHighWatermarks(65536);
}

/**
* Verify low and high watermarks being set correctly for smaller memory segment sizes than
* Netty's defaults.
*/
private final static int PageSize = 65536;
@Test
public void testSmallLowAndHighWatermarks() throws Throwable {
testLowAndHighWatermarks(1024);
}

/**
* Verifies that the high and low watermark are set in relation to the page size.
*
* <p> The high and low water marks control the data flow to the wire. If the Netty write buffer
* <p>The high and low water marks control the data flow to the wire. If the Netty write buffer
* has size greater or equal to the high water mark, the channel state becomes not-writable.
* Only when the size falls below the low water mark again, the state changes to writable again.
*
* <p> The Channel writability state needs to be checked by the handler when writing to the
* <p>The Channel writability state needs to be checked by the handler when writing to the
* channel and is not enforced in the sense that you cannot write a channel, which is in
* not-writable state.
*
* @param pageSize memory segment size to test with (influences high and low watermarks)
*/
@Test
public void testLowAndHighWatermarks() throws Throwable {
final int expectedLowWatermark = PageSize + 1;
final int expectedHighWatermark = 2 * PageSize;
private void testLowAndHighWatermarks(int pageSize) throws Throwable {
final int expectedLowWatermark = pageSize + 1;
final int expectedHighWatermark = 2 * pageSize;

final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
final NettyProtocol protocol = new NettyProtocol() {
@Override
public ChannelHandler[] getServerChannelHandlers() {
// The channel handler implements the test
return new ChannelHandler[] {new TestLowAndHighWatermarkHandler(
expectedLowWatermark, expectedHighWatermark, error)};
pageSize, expectedLowWatermark, expectedHighWatermark, error)};
}

@Override
Expand All @@ -78,7 +95,7 @@ public ChannelHandler[] getClientChannelHandlers() {
}
};

final NettyConfig conf = createConfig(PageSize);
final NettyConfig conf = createConfig(pageSize);

final NettyServerAndClient serverAndClient = initServerAndClient(protocol, conf);

Expand All @@ -103,10 +120,12 @@ public ChannelHandler[] getClientChannelHandlers() {
/**
* This handler implements the test.
*
* <p> Verifies that the high and low watermark are set in relation to the page size.
* <p>Verifies that the high and low watermark are set in relation to the page size.
*/
private static class TestLowAndHighWatermarkHandler extends ChannelInboundHandlerAdapter {

private final int pageSize;

private final int expectedLowWatermark;

private final int expectedHighWatermark;
Expand All @@ -115,7 +134,10 @@ private static class TestLowAndHighWatermarkHandler extends ChannelInboundHandle

private boolean hasFlushed;

public TestLowAndHighWatermarkHandler(int expectedLowWatermark, int expectedHighWatermark, AtomicReference<Throwable> error) {
public TestLowAndHighWatermarkHandler(
int pageSize, int expectedLowWatermark, int expectedHighWatermark,
AtomicReference<Throwable> error) {
this.pageSize = pageSize;
this.expectedLowWatermark = expectedLowWatermark;
this.expectedHighWatermark = expectedHighWatermark;
this.error = error;
Expand Down Expand Up @@ -167,14 +189,14 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E

super.exceptionCaught(ctx, cause);
}

private ByteBuf buffer() {
return NettyServerLowAndHighWatermarkTest.buffer(pageSize);
}
}

// ---------------------------------------------------------------------------------------------

private static ByteBuf buffer() {
return buffer(PageSize);
}

/**
* Creates a new buffer of the given size.
*/
Expand Down

0 comments on commit 88737cf

Please sign in to comment.