From 49b5103299374641662d66b5165441b532206b71 Mon Sep 17 00:00:00 2001 From: "Jiangjie (Becket) Qin" Date: Wed, 24 Jun 2020 21:22:18 +0800 Subject: [PATCH] [FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source(). This closes #12766 --- docs/dev/stream/sources.md | 6 +++--- docs/dev/stream/sources.zh.md | 6 +++--- .../base/source/reader/CoordinatedSourceITCase.java | 6 +++--- .../tests/test_stream_execution_environment_completeness.py | 2 +- .../api/environment/StreamExecutionEnvironment.java | 6 +++--- .../streaming/api/graph/StreamingJobGraphGeneratorTest.java | 4 ++-- .../streaming/api/scala/StreamExecutionEnvironment.scala | 4 ++-- .../api/scala/StreamExecutionEnvironmentTest.scala | 2 +- 8 files changed, 18 insertions(+), 18 deletions(-) diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md index 669ca8f5697d2..3c3db9085e12b 100644 --- a/docs/dev/stream/sources.md +++ b/docs/dev/stream/sources.md @@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn Source mySource = new MySource(...); -DataStream stream = env.continuousSource( +DataStream stream = env.fromSource( mySource, WatermarkStrategy.noWatermarks(), "MySourceName"); @@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment() val mySource = new MySource(...) -val stream = env.continuousSource( +val stream = env.fromSource( mySource, WatermarkStrategy.noWatermarks(), "MySourceName") @@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java). {% highlight java %} -environment.continuousSource( +environment.fromSource( Source source, WatermarkStrategy timestampsAndWatermarks, String sourceName) diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md index 3f20388e1a4fa..a063ecb54b2da 100644 --- a/docs/dev/stream/sources.zh.md +++ b/docs/dev/stream/sources.zh.md @@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn Source mySource = new MySource(...); -DataStream stream = env.continuousSource( +DataStream stream = env.fromSource( mySource, WatermarkStrategy.noWatermarks(), "MySourceName"); @@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment() val mySource = new MySource(...) -val stream = env.continuousSource( +val stream = env.fromSource( mySource, WatermarkStrategy.noWatermarks(), "MySourceName") @@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java). {% highlight java %} -environment.continuousSource( +environment.fromSource( Source source, WatermarkStrategy timestampsAndWatermarks, String sourceName) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java index 6582210f45f66..3280c387d1eb2 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java @@ -45,7 +45,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase { public void testEnumeratorReaderCommunication() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED); - DataStream stream = env.continuousSource( + DataStream stream = env.fromSource( source, WatermarkStrategy.noWatermarks(), "TestingSource"); @@ -57,11 +57,11 @@ public void testMultipleSources() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED); MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED); - DataStream stream1 = env.continuousSource( + DataStream stream1 = env.fromSource( source1, WatermarkStrategy.noWatermarks(), "TestingSource1"); - DataStream stream2 = env.continuousSource( + DataStream stream2 = env.fromSource( source2, WatermarkStrategy.noWatermarks(), "TestingSource2"); diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py index c91e086701cfe..9764cb445acf3 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py @@ -49,7 +49,7 @@ def excluded_methods(cls): 'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection', 'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource', 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener', - 'clearJobListeners', 'getJobListeners', "continuousSource"} + 'clearJobListeners', 'getJobListeners', "fromSource"} if __name__ == '__main__': diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 7de2e97fa2286..59837ac965eb7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1629,11 +1629,11 @@ public DataStreamSource addSource(SourceFunction function, Strin * @return the data stream constructed */ @Experimental - public DataStreamSource continuousSource( + public DataStreamSource fromSource( Source source, WatermarkStrategy timestampsAndWatermarks, String sourceName) { - return continuousSource(source, timestampsAndWatermarks, sourceName, null); + return fromSource(source, timestampsAndWatermarks, sourceName, null); } /** @@ -1650,7 +1650,7 @@ public DataStreamSource continuousSource( * @return the data stream constructed */ @Experimental - public DataStreamSource continuousSource( + public DataStreamSource fromSource( Source source, WatermarkStrategy timestampsAndWatermarks, String sourceName, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 42edc70dfffac..c9473258196ce 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -283,7 +283,7 @@ public Integer map(Integer value) throws Exception { @Test public void testOperatorCoordinatorAddedToJobVertex() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream stream = env.continuousSource( + DataStream stream = env.fromSource( new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks(), "TestingSource"); @@ -493,7 +493,7 @@ public void testInputOutputFormat() { @Test public void testCoordinatedOperator() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream source = env.continuousSource( + DataStream source = env.fromSource( new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks(), "TestSource"); diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 925d571c54688..9ab3acf803696 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -666,13 +666,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * Create a DataStream using a [[Source]]. */ @Experimental - def continuousSource[T: TypeInformation]( + def fromSource[T: TypeInformation]( source: Source[T, _ <: SourceSplit, _], watermarkStrategy: WatermarkStrategy[T], sourceName: String): DataStream[T] = { val typeInfo = implicitly[TypeInformation[T]] - asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName, typeInfo)) + asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo)) } /** diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala index fa503e09d90bd..8765cb3f6065b 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala @@ -40,7 +40,7 @@ class StreamExecutionEnvironmentTest { implicit val typeInfo: TypeInformation[Integer] = new MockTypeInfo() val env = StreamExecutionEnvironment.getExecutionEnvironment - val stream = env.continuousSource( + val stream = env.fromSource( new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks(), "test source")