Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source(). #12766

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/dev/stream/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn

Source mySource = new MySource(...);

DataStream<Integer> stream = env.continuousSource(
DataStream<Integer> stream = env.source(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName");
Expand All @@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()

val mySource = new MySource(...)

val stream = env.continuousSource(
val stream = env.source(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName")
Expand Down Expand Up @@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).

{% highlight java %}
environment.continuousSource(
environment.source(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName)
Expand Down
6 changes: 3 additions & 3 deletions docs/dev/stream/sources.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn

Source mySource = new MySource(...);

DataStream<Integer> stream = env.continuousSource(
DataStream<Integer> stream = env.source(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName");
Expand All @@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()

val mySource = new MySource(...)

val stream = env.continuousSource(
val stream = env.source(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName")
Expand Down Expand Up @@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).

{% highlight java %}
environment.continuousSource(
environment.source(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
public void testEnumeratorReaderCommunication() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED);
DataStream<Integer> stream = env.continuousSource(
DataStream<Integer> stream = env.source(
source,
WatermarkStrategy.noWatermarks(),
"TestingSource");
Expand All @@ -57,11 +57,11 @@ public void testMultipleSources() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED);
MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED);
DataStream<Integer> stream1 = env.continuousSource(
DataStream<Integer> stream1 = env.source(
source1,
WatermarkStrategy.noWatermarks(),
"TestingSource1");
DataStream<Integer> stream2 = env.continuousSource(
DataStream<Integer> stream2 = env.source(
source2,
WatermarkStrategy.noWatermarks(),
"TestingSource2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def excluded_methods(cls):
'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource',
'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
'clearJobListeners', 'getJobListeners', "continuousSource"}
'clearJobListeners', 'getJobListeners', "source"}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1629,11 +1629,11 @@ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, Strin
* @return the data stream constructed
*/
@Experimental
public <OUT> DataStreamSource<OUT> continuousSource(
public <OUT> DataStreamSource<OUT> source(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName) {
return continuousSource(source, timestampsAndWatermarks, sourceName, null);
return source(source, timestampsAndWatermarks, sourceName, null);
}

/**
Expand All @@ -1650,7 +1650,7 @@ public <OUT> DataStreamSource<OUT> continuousSource(
* @return the data stream constructed
*/
@Experimental
public <OUT> DataStreamSource<OUT> continuousSource(
public <OUT> DataStreamSource<OUT> source(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public Integer map(Integer value) throws Exception {
@Test
public void testOperatorCoordinatorAddedToJobVertex() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> stream = env.continuousSource(
DataStream<Integer> stream = env.source(
new MockSource(Boundedness.BOUNDED, 1),
WatermarkStrategy.noWatermarks(),
"TestingSource");
Expand Down Expand Up @@ -493,7 +493,7 @@ public void testInputOutputFormat() {
@Test
public void testCoordinatedOperator() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source = env.continuousSource(
DataStream<Integer> source = env.source(
new MockSource(Boundedness.BOUNDED, 1),
WatermarkStrategy.noWatermarks(),
"TestSource");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,13 +666,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Create a DataStream using a [[Source]].
*/
@Experimental
def continuousSource[T: TypeInformation](
def source[T: TypeInformation](
source: Source[T, _ <: SourceSplit, _],
watermarkStrategy: WatermarkStrategy[T],
sourceName: String): DataStream[T] = {

val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName, typeInfo))
asScalaStream(javaEnv.source(source, watermarkStrategy, sourceName, typeInfo))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StreamExecutionEnvironmentTest {
implicit val typeInfo: TypeInformation[Integer] = new MockTypeInfo()
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.continuousSource(
val stream = env.source(
new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1),
WatermarkStrategy.noWatermarks(),
"test source")
Expand Down