diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 794fa66727fd6..94cd2b7dc700f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -48,7 +48,7 @@ import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator; -import org.apache.flink.streaming.api.operators.StreamGroupedReduce; +import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -734,7 +734,7 @@ public WindowedStream window(WindowAssigner reduce(ReduceFunction reducer) { - return transform("Keyed Reduce", getType(), new StreamGroupedReduce<>( + return transform("Keyed Reduce", getType(), new StreamGroupedReduceOperator<>( clean(reducer), getType().createSerializer(getExecutionConfig()))); } @@ -1011,7 +1011,7 @@ public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { } protected SingleOutputStreamOperator aggregate(AggregationFunction aggregate) { - StreamGroupedReduce operator = new StreamGroupedReduce<>( + StreamGroupedReduceOperator operator = new StreamGroupedReduceOperator<>( clean(aggregate), getType().createSerializer(getExecutionConfig())); return transform("Keyed Aggregation", getType(), operator); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceOperator.java similarity index 89% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceOperator.java index 7338e1ccf0dcb..26833f7ee9d69 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceOperator.java @@ -30,7 +30,8 @@ */ @Internal -public class StreamGroupedReduce extends AbstractUdfStreamOperator> +public class StreamGroupedReduceOperator + extends AbstractUdfStreamOperator> implements OneInputStreamOperator { private static final long serialVersionUID = 1L; @@ -39,9 +40,9 @@ public class StreamGroupedReduce extends AbstractUdfStreamOperator values; - private TypeSerializer serializer; + private final TypeSerializer serializer; - public StreamGroupedReduce(ReduceFunction reducer, TypeSerializer serializer) { + public StreamGroupedReduceOperator(ReduceFunction reducer, TypeSerializer serializer) { super(reducer); this.serializer = serializer; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceOperatorTest.java similarity index 93% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceOperatorTest.java index f1c9bcadbbb01..9b2d224c2fc0b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceOperatorTest.java @@ -36,7 +36,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; /** - * Tests for {@link StreamGroupedReduce}. These test that: + * Tests for {@link StreamGroupedReduceOperator}. These test that: * *
    *
  • RichFunction methods are called correctly
  • @@ -45,14 +45,14 @@ *
*/ -public class StreamGroupedReduceTest { +public class StreamGroupedReduceOperatorTest { @Test public void testGroupedReduce() throws Exception { KeySelector keySelector = new IntegerKeySelector(); - StreamGroupedReduce operator = new StreamGroupedReduce<>(new MyReducer(), IntSerializer.INSTANCE); + StreamGroupedReduceOperator operator = new StreamGroupedReduceOperator<>(new MyReducer(), IntSerializer.INSTANCE); OneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -84,8 +84,8 @@ public void testOpenClose() throws Exception { KeySelector keySelector = new IntegerKeySelector(); - StreamGroupedReduce operator = - new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE); + StreamGroupedReduceOperator operator = + new StreamGroupedReduceOperator<>(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE); OneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.INT_TYPE_INFO); diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 7cee609daabe6..e13068ecfccbe 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.functions.aggregation.{AggregationFunction import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator} import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction} -import org.apache.flink.streaming.api.operators.StreamGroupedReduce +import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator import org.apache.flink.streaming.api.scala.function.StatefulFunction import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time @@ -487,7 +487,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] private def aggregate(aggregationFunc: AggregationFunction[T]): DataStream[T] = { val invokable = - new StreamGroupedReduce[T](aggregationFunc, dataType.createSerializer(executionConfig)) + new StreamGroupedReduceOperator[T](aggregationFunc, dataType.createSerializer(executionConfig)) new DataStream[T](javaStream.transform("aggregation", javaStream.getType(), invokable)) .asInstanceOf[DataStream[T]] diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java index 6951eb3ee1347..b058ff7d7ad37 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java @@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.StreamGroupedReduce; +import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator; import org.apache.flink.shaded.guava18.com.google.common.collect.EvictingQueue; @@ -45,7 +45,7 @@ * of {@link AbstractUdfStreamOperator} is correctly restored in case of recovery from * a failure. * - *

The topology currently tests the proper behaviour of the {@link StreamGroupedReduce} operator. + *

The topology currently tests the proper behaviour of the {@link StreamGroupedReduceOperator} operator. */ @SuppressWarnings("serial") public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {