Skip to content

Commit

Permalink
[FLINK-16718][tests] Fix ByteBuf leak in KvStateServerHandlerTest
Browse files Browse the repository at this point in the history
This closes apache#11453.
  • Loading branch information
GJL committed Mar 23, 2020
1 parent 6049b83 commit 50ee655
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public void testSimpleQuery() throws Exception {
assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
long deserRequestId = MessageSerializer.getRequestId(buf);
KvStateResponse response = serializer.deserializeResponse(buf);
buf.release();

assertEquals(requestId, deserRequestId);

Expand Down Expand Up @@ -217,6 +218,7 @@ public void testQueryUnknownKvStateID() throws Exception {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
buf.release();

assertEquals(requestId, response.getRequestId());

Expand Down Expand Up @@ -278,6 +280,7 @@ public void testQueryUnknownKey() throws Exception {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
buf.release();

assertEquals(requestId, response.getRequestId());

Expand Down Expand Up @@ -363,6 +366,7 @@ public void clear() {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
buf.release();

assertTrue(response.getCause().getMessage().contains("Expected test Exception"));

Expand Down Expand Up @@ -392,6 +396,7 @@ public void testCloseChannelOnExceptionCaught() throws Exception {
// Verify the response
assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
Throwable response = MessageSerializer.deserializeServerFailure(buf);
buf.release();

assertTrue(response.getMessage().contains("Expected test Exception"));

Expand Down Expand Up @@ -454,6 +459,7 @@ public void testQueryExecutorShutDown() throws Throwable {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
buf.release();

assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));

Expand Down Expand Up @@ -490,6 +496,7 @@ public void testUnexpectedMessage() throws Exception {
// Verify the response
assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
Throwable response = MessageSerializer.deserializeServerFailure(buf);
buf.release();

assertEquals(0L, stats.getNumRequests());
assertEquals(0L, stats.getNumFailed());
Expand All @@ -505,6 +512,7 @@ public void testUnexpectedMessage() throws Exception {
// Verify the response
assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
response = MessageSerializer.deserializeServerFailure(buf);
buf.release();

assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException);

Expand Down Expand Up @@ -544,6 +552,7 @@ public void testIncomingBufferIsRecycled() throws Exception {

channel.writeInbound(unexpected);
assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
channel.finishAndReleaseAll();
}

/**
Expand Down Expand Up @@ -610,6 +619,7 @@ public void testSerializerMismatch() throws Exception {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
buf.release();
assertEquals(182828L, response.getRequestId());
assertTrue(response.getCause().getMessage().contains("IOException"));

Expand All @@ -626,6 +636,7 @@ public void testSerializerMismatch() throws Exception {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
response = MessageSerializer.deserializeRequestFailure(buf);
buf.release();
assertEquals(182829L, response.getRequestId());
assertTrue(response.getCause().getMessage().contains("IOException"));

Expand Down Expand Up @@ -696,6 +707,7 @@ public void testChunkedResponse() throws Exception {

Object msg = readInboundBlocking(channel);
assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
((ChunkedByteBuf) msg).close();
}

// ------------------------------------------------------------------------
Expand Down

0 comments on commit 50ee655

Please sign in to comment.