Skip to content

Commit

Permalink
[FLINK-22464][tests] Fix OperatorCoordinator test which is stalling o…
Browse files Browse the repository at this point in the history
…r slow with AdaptiveScheduler

This closes apache#16229
  • Loading branch information
rmetzger committed Jun 25, 2021
1 parent 7475027 commit be15eb0
Showing 1 changed file with 64 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand All @@ -45,7 +50,6 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
Expand All @@ -55,7 +59,6 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -85,7 +88,10 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger {

@BeforeClass
public static void setupMiniClusterAndEnv() throws Exception {
flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM);
Configuration config = new Configuration();
// uncomment to run test with adaptive scheduler
// config.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM, config);
flinkCluster.start();
TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM);
}
Expand Down Expand Up @@ -126,7 +132,6 @@ public void testOperatorEventLostNoReaderFailure() throws Exception {
* additionally a failure on the reader that triggers recovery.
*/
@Test
@Category(FailsWithAdaptiveScheduler.class) // FLINK-22464
public void testOperatorEventLostWithReaderFailure() throws Exception {
final int[] eventsToLose = new int[] {1, 3};

Expand Down Expand Up @@ -205,6 +210,22 @@ private void runTest(boolean intermittentFailure) throws Exception {
env.setParallelism(1);
env.enableCheckpointing(50);

// This test depends on checkpoints persisting progress from the source before the
// artificial exception gets triggered. Otherwise, the job will run for a long time (or
// forever) because the exception will be thrown before any checkpoint successfully
// completes.
//
// Checkpoints are triggered once the checkpoint scheduler gets started + a random initial
// delay. For DefaultScheduler, this mechanism is fine, because DS starts the checkpoint
// coordinator, then requests the required slots and then deploys the tasks. These
// operations take enough time to have a checkpoint triggered by the time the task starts
// running. AdaptiveScheduler starts the CheckpointCoordinator right before deploying tasks
// (when slots are available already), hence tasks will start running almost immediately,
// and the checkpoint gets triggered too late (it won't be able to complete before the
// artificial failure from this test)
// Therefore, the TestingNumberSequenceSource waits for a checkpoint before emitting all
// required messages.

final DataStream<Long> numbers =
env.fromSource(
new TestingNumberSequenceSource(1L, numElements, 3),
Expand Down Expand Up @@ -257,7 +278,6 @@ private static CompletableFuture<Acknowledge> lateFuture() {
private static final class AssignAfterCheckpointEnumerator<
SplitT extends IteratorSourceSplit<?, ?>>
extends IteratorSourceEnumerator<SplitT> {

private final Queue<Integer> pendingRequests = new ArrayDeque<>();
private final SplitEnumeratorContext<?> context;

Expand Down Expand Up @@ -293,10 +313,12 @@ private static class TestingNumberSequenceSource extends NumberSequenceSource {
private static final long serialVersionUID = 1L;

private final int numSplits;
private final long numAllowedMessageBeforeCheckpoint;

public TestingNumberSequenceSource(long from, long to, int numSplits) {
super(from, to);
this.numSplits = numSplits;
this.numAllowedMessageBeforeCheckpoint = (to - from) / numSplits;
}

@Override
Expand All @@ -306,6 +328,40 @@ public TestingNumberSequenceSource(long from, long to, int numSplits) {
splitNumberRange(getFrom(), getTo(), numSplits);
return new AssignAfterCheckpointEnumerator<>(enumContext, splits);
}

@Override
public SourceReader<Long, NumberSequenceSplit> createReader(
SourceReaderContext readerContext) {
return new CheckpointListeningIteratorSourceReader(
readerContext, numAllowedMessageBeforeCheckpoint);
}
}

private static class CheckpointListeningIteratorSourceReader extends IteratorSourceReader {
private boolean checkpointed = false;
private long messagesProduced = 0;
private final long numAllowedMessageBeforeCheckpoint;

public CheckpointListeningIteratorSourceReader(
SourceReaderContext context, long waitForCheckpointAfterMessages) {
super(context);
this.numAllowedMessageBeforeCheckpoint = waitForCheckpointAfterMessages;
}

@Override
public InputStatus pollNext(ReaderOutput output) {
if (messagesProduced < numAllowedMessageBeforeCheckpoint || checkpointed) {
messagesProduced++;
return super.pollNext(output);
} else {
return InputStatus.NOTHING_AVAILABLE;
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
checkpointed = true;
}
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -421,11 +477,13 @@ private static class MiniClusterWithRpcIntercepting extends MiniCluster {

private boolean localRpcCreated;

public MiniClusterWithRpcIntercepting(final int numSlots) {
public MiniClusterWithRpcIntercepting(
final int numSlots, final Configuration configuration) {
super(
new MiniClusterConfiguration.Builder()
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumTaskManagers(1)
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlots)
.build());
}
Expand Down

0 comments on commit be15eb0

Please sign in to comment.