Skip to content

Commit

Permalink
[FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#conti…
Browse files Browse the repository at this point in the history
…nuousSource() to StreamExecutionEnvironment#source().

This closes apache#12766
  • Loading branch information
becketqin authored and StephanEwen committed Jun 24, 2020
1 parent 95b9adb commit 49b5103
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 18 deletions.
6 changes: 3 additions & 3 deletions docs/dev/stream/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn

Source mySource = new MySource(...);

DataStream<Integer> stream = env.continuousSource(
DataStream<Integer> stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName");
Expand All @@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()

val mySource = new MySource(...)

val stream = env.continuousSource(
val stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName")
Expand Down Expand Up @@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).

{% highlight java %}
environment.continuousSource(
environment.fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName)
Expand Down
6 changes: 3 additions & 3 deletions docs/dev/stream/sources.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn

Source mySource = new MySource(...);

DataStream<Integer> stream = env.continuousSource(
DataStream<Integer> stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName");
Expand All @@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()

val mySource = new MySource(...)

val stream = env.continuousSource(
val stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName")
Expand Down Expand Up @@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre
The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java).

{% highlight java %}
environment.continuousSource(
environment.fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
public void testEnumeratorReaderCommunication() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED);
DataStream<Integer> stream = env.continuousSource(
DataStream<Integer> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"TestingSource");
Expand All @@ -57,11 +57,11 @@ public void testMultipleSources() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED);
MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED);
DataStream<Integer> stream1 = env.continuousSource(
DataStream<Integer> stream1 = env.fromSource(
source1,
WatermarkStrategy.noWatermarks(),
"TestingSource1");
DataStream<Integer> stream2 = env.continuousSource(
DataStream<Integer> stream2 = env.fromSource(
source2,
WatermarkStrategy.noWatermarks(),
"TestingSource2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def excluded_methods(cls):
'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource',
'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
'clearJobListeners', 'getJobListeners', "continuousSource"}
'clearJobListeners', 'getJobListeners', "fromSource"}


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1629,11 +1629,11 @@ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, Strin
* @return the data stream constructed
*/
@Experimental
public <OUT> DataStreamSource<OUT> continuousSource(
public <OUT> DataStreamSource<OUT> fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName) {
return continuousSource(source, timestampsAndWatermarks, sourceName, null);
return fromSource(source, timestampsAndWatermarks, sourceName, null);
}

/**
Expand All @@ -1650,7 +1650,7 @@ public <OUT> DataStreamSource<OUT> continuousSource(
* @return the data stream constructed
*/
@Experimental
public <OUT> DataStreamSource<OUT> continuousSource(
public <OUT> DataStreamSource<OUT> fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public Integer map(Integer value) throws Exception {
@Test
public void testOperatorCoordinatorAddedToJobVertex() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> stream = env.continuousSource(
DataStream<Integer> stream = env.fromSource(
new MockSource(Boundedness.BOUNDED, 1),
WatermarkStrategy.noWatermarks(),
"TestingSource");
Expand Down Expand Up @@ -493,7 +493,7 @@ public void testInputOutputFormat() {
@Test
public void testCoordinatedOperator() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source = env.continuousSource(
DataStream<Integer> source = env.fromSource(
new MockSource(Boundedness.BOUNDED, 1),
WatermarkStrategy.noWatermarks(),
"TestSource");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,13 +666,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Create a DataStream using a [[Source]].
*/
@Experimental
def continuousSource[T: TypeInformation](
def fromSource[T: TypeInformation](
source: Source[T, _ <: SourceSplit, _],
watermarkStrategy: WatermarkStrategy[T],
sourceName: String): DataStream[T] = {

val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName, typeInfo))
asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StreamExecutionEnvironmentTest {
implicit val typeInfo: TypeInformation[Integer] = new MockTypeInfo()
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.continuousSource(
val stream = env.fromSource(
new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1),
WatermarkStrategy.noWatermarks(),
"test source")
Expand Down

0 comments on commit 49b5103

Please sign in to comment.