Skip to content

Commit

Permalink
[hotfix][test] Move CloseableRegistry as field in InputBuffersMetrics…
Browse files Browse the repository at this point in the history
…Test
  • Loading branch information
pnowojski committed Jul 9, 2019
1 parent a68dc3c commit 479b689
Showing 1 changed file with 59 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.util.TestLogger;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -43,6 +45,18 @@
*/
public class InputBuffersMetricsTest extends TestLogger {

private CloseableRegistry closeableRegistry;

@Before
public void setup() {
closeableRegistry = new CloseableRegistry();
}

@After
public void tearDown() throws IOException {
closeableRegistry.close();
}

@Test
public void testCalculateTotalBuffersSize() throws IOException {
int numberOfRemoteChannels = 2;
Expand All @@ -55,11 +69,13 @@ public void testCalculateTotalBuffersSize() throws IOException {
.setNetworkBuffersPerChannel(numberOfBufferPerChannel)
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
.build();
closeableRegistry.registerCloseable(network::close);

SingleInputGate inputGate1 = buildInputGate(
network,
numberOfRemoteChannels,
numberOfLocalChannels).f0;
closeableRegistry.registerCloseable(inputGate1::close);

SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1};
FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
Expand All @@ -69,15 +85,12 @@ public void testCalculateTotalBuffersSize() throws IOException {
exclusiveBuffersUsageGauge,
inputGates);

try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
closeableRegistry.registerCloseable(network::close);
closeableRegistry.registerCloseable(inputGate1::close);

closeableRegistry.registerCloseable(network::close);
closeableRegistry.registerCloseable(inputGate1::close);

assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
}
assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
}

@Test
Expand All @@ -96,6 +109,7 @@ public void testExclusiveBuffersUsage() throws IOException {
.setNetworkBuffersPerChannel(buffersPerChannel)
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
.build();
closeableRegistry.registerCloseable(network::close);

Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
network,
Expand All @@ -108,6 +122,8 @@ public void testExclusiveBuffersUsage() throws IOException {

SingleInputGate inputGate1 = tuple1.f0;
SingleInputGate inputGate2 = tuple2.f0;
closeableRegistry.registerCloseable(inputGate1::close);
closeableRegistry.registerCloseable(inputGate2::close);

List<RemoteInputChannel> remoteInputChannels = tuple1.f1;

Expand All @@ -119,29 +135,22 @@ public void testExclusiveBuffersUsage() throws IOException {
exclusiveBuffersUsageGauge,
inputGates);

try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);

int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;

int channelIndex = 1;
for (RemoteInputChannel channel : remoteInputChannels) {
drainAndValidate(
buffersPerChannel,
buffersPerChannel * channelIndex++,
channel,
closeableRegistry,
totalBuffers,
buffersPerChannel * totalNumberOfRemoteChannels,
exclusiveBuffersUsageGauge,
inputBuffersUsageGauge,
inputGate1);
}
} finally {
inputGate1.close();
inputGate2.close();
network.close();
assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);

int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;

int channelIndex = 1;
for (RemoteInputChannel channel : remoteInputChannels) {
drainAndValidate(
buffersPerChannel,
buffersPerChannel * channelIndex++,
channel,
totalBuffers,
buffersPerChannel * totalNumberOfRemoteChannels,
exclusiveBuffersUsageGauge,
inputBuffersUsageGauge,
inputGate1);
}
}

Expand All @@ -162,6 +171,7 @@ public void testFloatingBuffersUsage() throws IOException, InterruptedException
.setNetworkBuffersPerChannel(buffersPerChannel)
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
.build();
closeableRegistry.registerCloseable(network::close);

Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
network,
Expand All @@ -173,6 +183,8 @@ public void testFloatingBuffersUsage() throws IOException, InterruptedException
numberOfLocalChannelsGate2).f0;

SingleInputGate inputGate1 = tuple1.f0;
closeableRegistry.registerCloseable(inputGate1::close);
closeableRegistry.registerCloseable(inputGate2::close);

RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0);

Expand All @@ -184,54 +196,47 @@ public void testFloatingBuffersUsage() throws IOException, InterruptedException
exclusiveBuffersUsageGauge,
inputGates);

try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);

// drain gate1's exclusive buffers
drainBuffer(buffersPerChannel, remoteInputChannel1, closeableRegistry);
// drain gate1's exclusive buffers
drainBuffer(buffersPerChannel, remoteInputChannel1);

int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;

remoteInputChannel1.requestSubpartition(0);
remoteInputChannel1.requestSubpartition(0);

int backlog = 3;
int totalRequestedBuffers = buffersPerChannel + backlog;
int backlog = 3;
int totalRequestedBuffers = buffersPerChannel + backlog;

remoteInputChannel1.onSenderBacklog(backlog);
remoteInputChannel1.onSenderBacklog(backlog);

assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());

drainBuffer(totalRequestedBuffers, remoteInputChannel1, closeableRegistry);
drainBuffer(totalRequestedBuffers, remoteInputChannel1);

assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers,
inputBuffersUsageGauge.getValue(), 0.0001);
} finally {
inputGate1.close();
inputGate2.close();
network.close();
}
assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers,
inputBuffersUsageGauge.getValue(), 0.0001);
}

private void drainAndValidate(
int numBuffersToRequest,
int totalRequestedBuffers,
RemoteInputChannel channel,
CloseableRegistry closeableRegistry,
int totalBuffers,
int totalExclusiveBuffers,
ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge,
SingleInputGate inputGate) throws IOException {

drainBuffer(numBuffersToRequest, channel, closeableRegistry);
drainBuffer(numBuffersToRequest, channel);
assertEquals(totalRequestedBuffers, exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate));
assertEquals((double) totalRequestedBuffers / totalExclusiveBuffers, exclusiveBuffersUsageGauge.getValue(), 0.0001);
assertEquals((double) totalRequestedBuffers / totalBuffers, inputBuffersUsageGauge.getValue(), 0.0001);
}

private void drainBuffer(int boundary, RemoteInputChannel channel, CloseableRegistry closeableRegistry) throws IOException {
private void drainBuffer(int boundary, RemoteInputChannel channel) throws IOException {
for (int i = 0; i < boundary; i++) {
Buffer buffer = channel.requestBuffer();
if (buffer != null) {
Expand Down

0 comments on commit 479b689

Please sign in to comment.