Skip to content

Commit

Permalink
Merge pull request apache#12244 from pnowojski/f17258
Browse files Browse the repository at this point in the history
[FLINK-17258][network] Fix couple of ITCases that were failing with enabled unaligned checkpoints
  • Loading branch information
pnowojski committed May 21, 2020
1 parent 3ab5d58 commit 463dc8b
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public void start(long checkpointId, CheckpointOptions checkpointOptions) {
enqueue(new CheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false);
return result;
});
Preconditions.checkArgument(put == result, "result future already present for checkpoint id: " + checkpointId);
Preconditions.checkArgument(put == result, "result future already present for checkpoint " + checkpointId);
}

@Override
public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
LOG.debug(
"{} adding input data, checkpoint id: {}, channel: {}, startSeqNum: {}",
"{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}",
taskName,
checkpointId,
info,
Expand All @@ -123,7 +123,7 @@ public void addInputData(long checkpointId, InputChannelInfo info, int startSeqN
@Override
public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
LOG.debug(
"{} adding output data, checkpoint id: {}, channel: {}, startSeqNum: {}, num buffers: {}",
"{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}",
taskName,
checkpointId,
info,
Expand All @@ -134,35 +134,35 @@ public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int st

@Override
public void finishInput(long checkpointId) {
LOG.debug("{} finishing input data, checkpoint id: {}", taskName, checkpointId);
LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId);
enqueue(completeInput(checkpointId), false);
}

@Override
public void finishOutput(long checkpointId) {
LOG.debug("{} finishing output data, checkpoint id: {}", taskName, checkpointId);
LOG.debug("{} finishing output data, checkpoint {}", taskName, checkpointId);
enqueue(completeOutput(checkpointId), false);
}

@Override
public void abort(long checkpointId, Throwable cause) {
LOG.debug("{} aborting, checkpoint id: {}", taskName, checkpointId);
LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started
enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started
results.remove(checkpointId);
}

@Override
public ChannelStateWriteResult getWriteResult(long checkpointId) {
LOG.debug("{} requested write result, checkpoint id: {}", taskName, checkpointId);
LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
ChannelStateWriteResult result = results.get(checkpointId);
Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint id " + checkpointId);
Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint " + checkpointId);
return result;
}

@Override
public void stop(long checkpointId) {
LOG.debug("{} stopping checkpoint id: {}", taskName, checkpointId);
LOG.debug("{} stopping checkpoint {}", taskName, checkpointId);
results.remove(checkpointId);
}

Expand All @@ -172,6 +172,7 @@ public void open() {

@Override
public void close() throws IOException {
LOG.debug("close, dropping checkpoints {}", results.keySet());
results.clear();
executor.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ class AggregateITCase(
t1.toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()
val expected = List("1", "3")
assertEquals(expected, sink.getRetractResults)
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,9 @@ public void testDisposeSavepointWithCustomKvState() throws Exception {
String.valueOf(parallelism),
checkpointDir.toURI().toString(),
"5000",
outputDir.toURI().toString()})
outputDir.toURI().toString(),
"false" // Disable unaligned checkpoints as this test is triggering concurrent savepoints/checkpoints
})
.build();

TestStreamEnvironment.setAsContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.test.util.InfiniteIntegerSource;
import org.apache.flink.util.Collector;

import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;

/**
Expand All @@ -47,10 +48,12 @@ public static void main(String[] args) throws Exception {
final String checkpointPath = args[1];
final int checkpointingInterval = Integer.parseInt(args[2]);
final String outputPath = args[3];
final Optional<Boolean> unalignedCheckpoints = args.length > 4 ? Optional.of(Boolean.parseBoolean(args[4])) : Optional.empty();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.enableCheckpointing(checkpointingInterval);
env.enableCheckpointing(checkpointingInterval);
unalignedCheckpoints.ifPresent(value -> env.getCheckpointConfig().enableUnalignedCheckpoints(value));
env.setStateBackend(new FsStateBackend(checkpointPath));

DataStream<Integer> source = env.addSource(new InfiniteIntegerSource());
Expand Down

0 comments on commit 463dc8b

Please sign in to comment.