From 4fdcbcea3e3968eec6b3b225458e2ce7b7ada1bd Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 26 May 2020 18:47:48 +0200 Subject: [PATCH] [FLINK-17899][runtime] Add WatermarkStrategies to countinuousSource() methods in the DataStream API --- .../reader/CoordinatedSourceITCase.java | 16 ++++++++++++--- .../api/datastream/DataStreamSource.java | 4 +++- .../StreamExecutionEnvironment.java | 20 +++++++++++++++---- .../api/operators/SourceOperatorFactory.java | 17 ++++++++++------ .../graph/StreamingJobGraphGeneratorTest.java | 13 ++++++++---- .../scala/StreamExecutionEnvironment.scala | 4 +++- 6 files changed, 55 insertions(+), 19 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java index 1227e9f7066cd..4877defe26a51 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java @@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one package org.apache.flink.connector.base.source.reader; import org.apache.flink.api.common.accumulators.ListAccumulator; +import org.apache.flink.api.common.eventtime.WatermarkStrategies; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.configuration.Configuration; @@ -44,7 +45,10 @@ public class CoordinatedSourceITCase extends AbstractTestBase { public void testEnumeratorReaderCommunication() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED); - DataStream stream = env.continuousSource(source, "TestingSource"); + DataStream stream = env.continuousSource( + source, + WatermarkStrategies.noWatermarks().build(), + "TestingSource"); executeAndVerify(env, stream, 20); } @@ -53,8 +57,14 @@ public void testMultipleSources() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED); MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED); - DataStream stream1 = env.continuousSource(source1, "TestingSource1"); - DataStream stream2 = env.continuousSource(source2, "TestingSource2"); + DataStream stream1 = env.continuousSource( + source1, + WatermarkStrategies.noWatermarks().build(), + "TestingSource1"); + DataStream stream2 = env.continuousSource( + source2, + WatermarkStrategies.noWatermarks().build(), + "TestingSource2"); executeAndVerify(env, stream1.union(stream2), 40); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java index 69c865814ad81..9fb2c26c630f6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.operators.util.OperatorValidationUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Source; @@ -68,12 +69,13 @@ public DataStreamSource(SingleOutputStreamOperator operator) { public DataStreamSource( StreamExecutionEnvironment environment, Source source, + WatermarkStrategy timestampsAndWatermarks, TypeInformation outTypeInfo, String sourceName) { super(environment, new SourceTransformation<>( sourceName, - new SourceOperatorFactory<>(source), + new SourceOperatorFactory<>(source, timestampsAndWatermarks), outTypeInfo, environment.getParallelism())); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 52204cf5d26be..a47eaede19896 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.FilePathFilter; @@ -1614,8 +1615,11 @@ public DataStreamSource addSource(SourceFunction function, Strin * @return the data stream constructed */ @Experimental - public DataStreamSource continuousSource(Source source, String sourceName) { - return continuousSource(source, sourceName, null); + public DataStreamSource continuousSource( + Source source, + WatermarkStrategy timestampsAndWatermarks, + String sourceName) { + return continuousSource(source, timestampsAndWatermarks, sourceName, null); } /** @@ -1634,10 +1638,18 @@ public DataStreamSource continuousSource(Source source, St @Experimental public DataStreamSource continuousSource( Source source, + WatermarkStrategy timestampsAndWatermarks, String sourceName, TypeInformation typeInfo) { - TypeInformation resolvedTypeInfo = getTypeInfo(source, sourceName, Source.class, typeInfo); - return new DataStreamSource<>(this, source, resolvedTypeInfo, sourceName); + + final TypeInformation resolvedTypeInfo = getTypeInfo(source, sourceName, Source.class, typeInfo); + + return new DataStreamSource<>( + this, + checkNotNull(source, "source"), + checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"), + checkNotNull(resolvedTypeInfo), + checkNotNull(sourceName)); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java index 02c7927ee9260..d1636cdc2a1a4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java @@ -28,12 +28,13 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider; -import org.apache.flink.streaming.api.operators.source.NoOpWatermarkGenerator; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware; import java.util.function.Function; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * The Factory class for {@link SourceOperator}. */ @@ -46,17 +47,21 @@ public class SourceOperatorFactory extends AbstractStreamOperatorFactory source; /** The event time setup (timestamp assigners, watermark generators, etc.). */ - private final WatermarkStrategy watermarkStrategy = (ctx) -> new NoOpWatermarkGenerator<>(); + private final WatermarkStrategy watermarkStrategy; /** The number of worker thread for the source coordinator. */ private final int numCoordinatorWorkerThread; - public SourceOperatorFactory(Source source) { - this(source, 1); + public SourceOperatorFactory(Source source, WatermarkStrategy watermarkStrategy) { + this(source, watermarkStrategy, 1); } - public SourceOperatorFactory(Source source, int numCoordinatorWorkerThread) { - this.source = source; + public SourceOperatorFactory( + Source source, + WatermarkStrategy watermarkStrategy, + int numCoordinatorWorkerThread) { + this.source = checkNotNull(source); + this.watermarkStrategy = checkNotNull(watermarkStrategy); this.numCoordinatorWorkerThread = numCoordinatorWorkerThread; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 1fb977a9a4616..fec7b0d32f41b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategies; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -250,8 +251,10 @@ public Integer map(Integer value) throws Exception { @Test public void testOperatorCoordinatorAddedToJobVertex() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream stream = - env.continuousSource(new MockSource(Boundedness.BOUNDED, 1), "TestingSource"); + DataStream stream = env.continuousSource( + new MockSource(Boundedness.BOUNDED, 1), + WatermarkStrategies.noWatermarks().build(), + "TestingSource"); OneInputTransformation resultTransform = new OneInputTransformation( stream.getTransformation(), @@ -458,8 +461,10 @@ public void testInputOutputFormat() { @Test public void testCoordinatedOperator() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream source = - env.continuousSource(new MockSource(Boundedness.BOUNDED, 1), "TestSource"); + DataStream source = env.continuousSource( + new MockSource(Boundedness.BOUNDED, 1), + WatermarkStrategies.noWatermarks().build(), + "TestSource"); source.addSink(new DiscardingSink<>()); StreamGraph streamGraph = env.getStreamGraph(); diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 509ab65761b13..2b238451fd99d 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala import com.esotericsoftware.kryo.Serializer import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving} +import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat} import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.typeinfo.TypeInformation @@ -667,8 +668,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { @Experimental def continuousSource[T: TypeInformation]( source: Source[T, _ <: SourceSplit, _], + watermarkStrategy: WatermarkStrategy[T], sourceName: String): Unit = { - asScalaStream(javaEnv.continuousSource(source, sourceName)) + asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName)) } /**