Skip to content

Commit

Permalink
[FLINK-23954][tests] Fix e2e test test_queryable_state_restart_tm.sh
Browse files Browse the repository at this point in the history
The problem was that we output the number of state entries at the time when the checkpoint
is confirmed. However, at this point in time we might have already added more elements to
the map. Hence, we risk reporting more elements than are actually contained in the checkpoint.
The solution is to remember what the element count was when creating the checkpoint.

This closes apache#17027.
  • Loading branch information
tillrohrmann committed Aug 30, 2021
1 parent 6a6dfef commit b57617f
Showing 1 changed file with 30 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,22 @@
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

/**
Expand Down Expand Up @@ -125,7 +127,8 @@ public void run(SourceContext<Email> ctx) throws Exception {
new LabelSurrogate(LabelSurrogate.Type.values()[r % types], "bar");

synchronized (ctx.getCheckpointLock()) {
ctx.collect(new Email(emailId, timestamp, foo, label));
final Email email = new Email(emailId, timestamp, foo, label);
ctx.collect(email);
}

Thread.sleep(30L);
Expand All @@ -139,17 +142,17 @@ public void cancel() {
}

private static class TestFlatMap extends RichFlatMapFunction<Email, Object>
implements CheckpointListener {
implements CheckpointListener, CheckpointedFunction {

private static final long serialVersionUID = 7821128115999005941L;

private static final Logger LOG = LoggerFactory.getLogger(TestFlatMap.class);

private transient MapState<EmailId, EmailInformation> state;
private transient int count;
private transient Map<Long, Integer> countsAtCheckpoint;
private transient long lastCompletedCheckpoint;

@Override
public void open(Configuration parameters) throws Exception {
public void open(Configuration parameters) {
MapStateDescriptor<EmailId, EmailInformation> stateDescriptor =
new MapStateDescriptor<>(
QsConstants.STATE_NAME,
Expand All @@ -158,24 +161,34 @@ public void open(Configuration parameters) throws Exception {

stateDescriptor.setQueryable(QsConstants.QUERY_NAME);
state = getRuntimeContext().getMapState(stateDescriptor);
updateCount();

LOG.info("Open {} with a count of {}.", getClass().getSimpleName(), count);
}

private void updateCount() throws Exception {
count = Iterables.size(state.keys());
countsAtCheckpoint = new HashMap<>();
count = -1;
lastCompletedCheckpoint = -1;
}

@Override
public void flatMap(Email value, Collector<Object> out) throws Exception {
state.put(value.getEmailId(), new EmailInformation(value));
updateCount();
count = Iterables.size(state.keys());
}

@Override
public void notifyCheckpointComplete(long checkpointId) {
System.out.println("Count on snapshot: " + count); // we look for it in the test
if (checkpointId > lastCompletedCheckpoint) {
lastCompletedCheckpoint = checkpointId;
final int countAtCheckpoint = countsAtCheckpoint.remove(lastCompletedCheckpoint);

System.out.println(
"Count on snapshot: " + countAtCheckpoint); // we look for it in the test
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) {
countsAtCheckpoint.put(context.getCheckpointId(), count);
}

@Override
public void initializeState(FunctionInitializationContext context) {}
}
}

0 comments on commit b57617f

Please sign in to comment.