Skip to content

Commit

Permalink
[FLINK-14239] Fix the max watermark in StreamSource may arrive the do…
Browse files Browse the repository at this point in the history
…wnstream operator early
  • Loading branch information
sunhaibotb authored and aljoscha committed Nov 14, 2019
1 parent c196d0f commit 3171edf
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,13 @@ public void run(final Object lockingObject,
// a final watermark that indicates that we reached the end of event-time, and end inputs
// of the operator chain
if (!isCanceledOrStopped()) {
advanceToEndOfEventTime();

// in theory, the subclasses of StreamSource may implement the BoundedOneInput interface,
// so we still need the following call to end the input
synchronized (lockingObject) {
operatorChain.endHeadOperatorInput(1);
}
}
} finally {
// make sure that the context is closed in any case
ctx.close();
if (latencyEmitter != null) {
latencyEmitter.close();
}
Expand All @@ -128,6 +124,21 @@ public void advanceToEndOfEventTime() {
}
}

@Override
public void close() throws Exception {
try {
super.close();
if (!isCanceledOrStopped() && ctx != null) {
advanceToEndOfEventTime();
}
} finally {
// make sure that the context is closed in any case
if (ctx != null) {
ctx.close();
}
}
}

public void cancel() {
// important: marking the source as stopped has to happen before the function is stopped.
// the flag that tracks this status is volatile, so the memory model also guarantees
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ private void testLatencyMarkEmission(int numberLatencyMarkers, OperatorSetupOper
StreamTask.createRecordWriters(operator.getOperatorConfig(), new MockEnvironmentBuilder().build()));
try {
operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<Long>(output), operatorChain);
operator.close();
} finally {
operatorChain.releaseOutputs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
Expand All @@ -37,21 +39,25 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -63,81 +69,64 @@ public class StreamSourceOperatorWatermarksTest {

@Test
public void testEmitMaxWatermarkForFiniteSource() throws Exception {
StreamSource<String, ?> sourceOperator = new StreamSource<>(new FiniteSource());
StreamTaskTestHarness<String> testHarness = setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO);

// regular stream source operator
StreamSource<String, FiniteSource<String>> operator =
new StreamSource<>(new FiniteSource<String>());

final List<StreamElement> output = new ArrayList<>();

setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
OperatorChain<?, ?> operatorChain = createOperatorChain(operator);
try {
operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output), operatorChain);
} finally {
operatorChain.releaseOutputs();
}
testHarness.invoke();
testHarness.waitForTaskCompletion();

assertEquals(1, output.size());
assertEquals(Watermark.MAX_WATERMARK, output.get(0));
assertEquals(1, testHarness.getOutput().size());
assertEquals(Watermark.MAX_WATERMARK, testHarness.getOutput().peek());
}

@Test
public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
public void testMaxWatermarkIsForwardedLastForFiniteSource() throws Exception {
StreamSource<String, ?> sourceOperator = new StreamSource<>(new FiniteSource(true));
StreamTaskTestHarness<String> testHarness = setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO);

final List<StreamElement> output = new ArrayList<>();
testHarness.invoke();
testHarness.waitForTaskCompletion();

// regular stream source operator
final StreamSource<String, InfiniteSource<String>> operator =
new StreamSource<>(new InfiniteSource<String>());
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>("Hello"));
expectedOutput.add(Watermark.MAX_WATERMARK);

setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
operator.cancel();
TestHarnessUtil.assertOutputEquals("Output was not correct.",
expectedOutput,
testHarness.getOutput());
}

// run and exit
OperatorChain<?, ?> operatorChain = createOperatorChain(operator);
@Test
public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
StreamSource<String, ?> sourceOperator = new StreamSource<>(new InfiniteSource<>());
StreamTaskTestHarness<String> testHarness = setupSourceStreamTask(
sourceOperator, BasicTypeInfo.STRING_TYPE_INFO, true);

testHarness.invoke();
try {
operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output), operatorChain);
} finally {
operatorChain.releaseOutputs();
testHarness.waitForTaskCompletion();
fail("should throw an exception");
} catch (Throwable t) {
assertTrue(ExceptionUtils.findThrowable(t, CancelTaskException.class).isPresent());
}

assertTrue(output.isEmpty());
assertTrue(testHarness.getOutput().isEmpty());
}

@Test
public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
StreamSource<String, ?> sourceOperator = new StreamSource<>(new InfiniteSource<>());
StreamTaskTestHarness<String> testHarness = setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO);

final List<StreamElement> output = new ArrayList<>();

// regular stream source operator
final StreamSource<String, InfiniteSource<String>> operator =
new StreamSource<>(new InfiniteSource<String>());

setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);

// trigger an async cancel in a bit
new Thread("canceler") {
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {}
operator.cancel();
}
}.start();

// run and wait to be canceled
OperatorChain<?, ?> operatorChain = createOperatorChain(operator);
testHarness.invoke();
testHarness.waitForTaskRunning();
Thread.sleep(200);
testHarness.getTask().cancel(); // cancel task
try {
operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output), operatorChain);
testHarness.waitForTaskCompletion();
} catch (Throwable t) {
assertTrue(ExceptionUtils.findThrowable(t, CancelTaskException.class).isPresent());
}
catch (InterruptedException ignored) {}
finally {
operatorChain.releaseOutputs();
}

assertTrue(output.isEmpty());
assertTrue(testHarness.getOutput().isEmpty());
}

@Test
Expand Down Expand Up @@ -184,14 +173,6 @@ public void testAutomaticWatermarkContext() throws Exception {

// ------------------------------------------------------------------------

@SuppressWarnings("unchecked")
private static <T> void setupSourceOperator(
StreamSource<T, ?> operator,
TimeCharacteristic timeChar,
long watermarkInterval) throws Exception {
setupSourceOperator(operator, timeChar, watermarkInterval, new TestProcessingTimeService());
}

@SuppressWarnings("unchecked")
private static <T> void setupSourceOperator(
StreamSource<T, ?> operator,
Expand Down Expand Up @@ -223,21 +204,75 @@ private static <T> void setupSourceOperator(
operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
}

private static OperatorChain<?, ?> createOperatorChain(AbstractStreamOperator<?> operator) {
return new OperatorChain<>(
operator.getContainingTask(),
StreamTask.createRecordWriters(operator.getOperatorConfig(), new MockEnvironmentBuilder().build()));
private static <T> StreamTaskTestHarness<T> setupSourceStreamTask(
StreamSource<T, ?> sourceOperator,
TypeInformation<T> outputType) {

return setupSourceStreamTask(sourceOperator, outputType, false);
}

private static <T> StreamTaskTestHarness<T> setupSourceStreamTask(
StreamSource<T, ?> sourceOperator,
TypeInformation<T> outputType,
final boolean cancelImmediatelyAfterCreation) {

final StreamTaskTestHarness<T> testHarness = new StreamTaskTestHarness<>(
(env) -> {
SourceStreamTask<T, ?, ?> sourceTask = new SourceStreamTask<>(env);
if (cancelImmediatelyAfterCreation) {
try {
sourceTask.cancel();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return sourceTask;
},
outputType);
testHarness.setupOutputForSingletonOperatorChain();

StreamConfig streamConfig = testHarness.getStreamConfig();
streamConfig.setStreamOperator(sourceOperator);
streamConfig.setOperatorID(new OperatorID());
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);

return testHarness;
}

// ------------------------------------------------------------------------

private static final class FiniteSource<T> implements SourceFunction<T> {
private static final class FiniteSource extends RichSourceFunction<String> {

private transient volatile boolean canceled = false;

private transient SourceContext<String> context;

private final boolean outputingARecordWhenClosing;

public FiniteSource() {
this(false);
}

public FiniteSource(boolean outputingARecordWhenClosing) {
this.outputingARecordWhenClosing = outputingARecordWhenClosing;
}

@Override
public void run(SourceContext<T> ctx) {}
public void run(SourceContext<String> ctx) {
context = ctx;
}

@Override
public void cancel() {}
public void close() {
if (!canceled && outputingARecordWhenClosing) {
context.collect("Hello");
}
}

@Override
public void cancel() {
canceled = true;
}
}

private static final class InfiniteSource<T> implements SourceFunction<T> {
Expand Down

0 comments on commit 3171edf

Please sign in to comment.