Skip to content

Commit

Permalink
[FLINK-17899][runtime] Add WatermarkStrategies to countinuousSource()…
Browse files Browse the repository at this point in the history
… methods in the DataStream API
  • Loading branch information
StephanEwen committed May 27, 2020
1 parent 31d669e commit 4fdcbce
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flink.connector.base.source.reader;

import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.eventtime.WatermarkStrategies;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
Expand All @@ -44,7 +45,10 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
public void testEnumeratorReaderCommunication() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED);
DataStream<Integer> stream = env.continuousSource(source, "TestingSource");
DataStream<Integer> stream = env.continuousSource(
source,
WatermarkStrategies.<Integer>noWatermarks().build(),
"TestingSource");
executeAndVerify(env, stream, 20);
}

Expand All @@ -53,8 +57,14 @@ public void testMultipleSources() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED);
MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED);
DataStream<Integer> stream1 = env.continuousSource(source1, "TestingSource1");
DataStream<Integer> stream2 = env.continuousSource(source2, "TestingSource2");
DataStream<Integer> stream1 = env.continuousSource(
source1,
WatermarkStrategies.<Integer>noWatermarks().build(),
"TestingSource1");
DataStream<Integer> stream2 = env.continuousSource(
source2,
WatermarkStrategies.<Integer>noWatermarks().build(),
"TestingSource2");
executeAndVerify(env, stream1.union(stream2), 40);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.datastream;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
Expand Down Expand Up @@ -68,12 +69,13 @@ public DataStreamSource(SingleOutputStreamOperator<T> operator) {
public DataStreamSource(
StreamExecutionEnvironment environment,
Source<T, ?, ?> source,
WatermarkStrategy<T> timestampsAndWatermarks,
TypeInformation<T> outTypeInfo,
String sourceName) {
super(environment,
new SourceTransformation<>(
sourceName,
new SourceOperatorFactory<>(source),
new SourceOperatorFactory<>(source, timestampsAndWatermarks),
outTypeInfo,
environment.getParallelism()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
Expand Down Expand Up @@ -1614,8 +1615,11 @@ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, Strin
* @return the data stream constructed
*/
@Experimental
public <OUT> DataStreamSource<OUT> continuousSource(Source<OUT, ?, ?> source, String sourceName) {
return continuousSource(source, sourceName, null);
public <OUT> DataStreamSource<OUT> continuousSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName) {
return continuousSource(source, timestampsAndWatermarks, sourceName, null);
}

/**
Expand All @@ -1634,10 +1638,18 @@ public <OUT> DataStreamSource<OUT> continuousSource(Source<OUT, ?, ?> source, St
@Experimental
public <OUT> DataStreamSource<OUT> continuousSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName,
TypeInformation<OUT> typeInfo) {
TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(source, sourceName, Source.class, typeInfo);
return new DataStreamSource<>(this, source, resolvedTypeInfo, sourceName);

final TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(source, sourceName, Source.class, typeInfo);

return new DataStreamSource<>(
this,
checkNotNull(source, "source"),
checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"),
checkNotNull(resolvedTypeInfo),
checkNotNull(sourceName));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.streaming.api.operators.source.NoOpWatermarkGenerator;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;

import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The Factory class for {@link SourceOperator}.
*/
Expand All @@ -46,17 +47,21 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
private final Source<OUT, ?, ?> source;

/** The event time setup (timestamp assigners, watermark generators, etc.). */
private final WatermarkStrategy<OUT> watermarkStrategy = (ctx) -> new NoOpWatermarkGenerator<>();
private final WatermarkStrategy<OUT> watermarkStrategy;

/** The number of worker thread for the source coordinator. */
private final int numCoordinatorWorkerThread;

public SourceOperatorFactory(Source<OUT, ?, ?> source) {
this(source, 1);
public SourceOperatorFactory(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy) {
this(source, watermarkStrategy, 1);
}

public SourceOperatorFactory(Source<OUT, ?, ?> source, int numCoordinatorWorkerThread) {
this.source = source;
public SourceOperatorFactory(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> watermarkStrategy,
int numCoordinatorWorkerThread) {
this.source = checkNotNull(source);
this.watermarkStrategy = checkNotNull(watermarkStrategy);
this.numCoordinatorWorkerThread = numCoordinatorWorkerThread;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.graph;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategies;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -250,8 +251,10 @@ public Integer map(Integer value) throws Exception {
@Test
public void testOperatorCoordinatorAddedToJobVertex() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> stream =
env.continuousSource(new MockSource(Boundedness.BOUNDED, 1), "TestingSource");
DataStream<Integer> stream = env.continuousSource(
new MockSource(Boundedness.BOUNDED, 1),
WatermarkStrategies.<Integer>noWatermarks().build(),
"TestingSource");

OneInputTransformation<Integer, Integer> resultTransform = new OneInputTransformation<Integer, Integer>(
stream.getTransformation(),
Expand Down Expand Up @@ -458,8 +461,10 @@ public void testInputOutputFormat() {
@Test
public void testCoordinatedOperator() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source =
env.continuousSource(new MockSource(Boundedness.BOUNDED, 1), "TestSource");
DataStream<Integer> source = env.continuousSource(
new MockSource(Boundedness.BOUNDED, 1),
WatermarkStrategies.<Integer>noWatermarks().build(),
"TestSource");
source.addSink(new DiscardingSink<>());

StreamGraph streamGraph = env.getStreamGraph();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala

import com.esotericsoftware.kryo.Serializer
import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving}
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand Down Expand Up @@ -667,8 +668,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
@Experimental
def continuousSource[T: TypeInformation](
source: Source[T, _ <: SourceSplit, _],
watermarkStrategy: WatermarkStrategy[T],
sourceName: String): Unit = {
asScalaStream(javaEnv.continuousSource(source, sourceName))
asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName))
}

/**
Expand Down

0 comments on commit 4fdcbce

Please sign in to comment.