Skip to content

Commit

Permalink
[hotfix] Add Operator suffix to StreamGroupedReduce
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Nov 3, 2020
1 parent 477d37d commit 5468fab
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
Expand Down Expand Up @@ -734,7 +734,7 @@ public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? supe
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
return transform("Keyed Reduce", getType(), new StreamGroupedReduce<>(
return transform("Keyed Reduce", getType(), new StreamGroupedReduceOperator<>(
clean(reducer), getType().createSerializer(getExecutionConfig())));
}

Expand Down Expand Up @@ -1011,7 +1011,7 @@ public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
}

protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
StreamGroupedReduce<T> operator = new StreamGroupedReduce<>(
StreamGroupedReduceOperator<T> operator = new StreamGroupedReduceOperator<>(
clean(aggregate), getType().createSerializer(getExecutionConfig()));
return transform("Keyed Aggregation", getType(), operator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
*/

@Internal
public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
public class StreamGroupedReduceOperator<IN>
extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
implements OneInputStreamOperator<IN, IN> {

private static final long serialVersionUID = 1L;
Expand All @@ -39,9 +40,9 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc

private transient ValueState<IN> values;

private TypeSerializer<IN> serializer;
private final TypeSerializer<IN> serializer;

public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
public StreamGroupedReduceOperator(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
super(reducer);
this.serializer = serializer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Tests for {@link StreamGroupedReduce}. These test that:
* Tests for {@link StreamGroupedReduceOperator}. These test that:
*
* <ul>
* <li>RichFunction methods are called correctly</li>
Expand All @@ -45,14 +45,14 @@
* </ul>
*/

public class StreamGroupedReduceTest {
public class StreamGroupedReduceOperatorTest {

@Test
public void testGroupedReduce() throws Exception {

KeySelector<Integer, Integer> keySelector = new IntegerKeySelector();

StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), IntSerializer.INSTANCE);
StreamGroupedReduceOperator<Integer> operator = new StreamGroupedReduceOperator<>(new MyReducer(), IntSerializer.INSTANCE);

OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.INT_TYPE_INFO);
Expand Down Expand Up @@ -84,8 +84,8 @@ public void testOpenClose() throws Exception {

KeySelector<Integer, Integer> keySelector = new IntegerKeySelector();

StreamGroupedReduce<Integer> operator =
new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE);
StreamGroupedReduceOperator<Integer> operator =
new StreamGroupedReduceOperator<>(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE);
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.INT_TYPE_INFO);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.functions.aggregation.{AggregationFunction
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
import org.apache.flink.streaming.api.operators.StreamGroupedReduce
import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator
import org.apache.flink.streaming.api.scala.function.StatefulFunction
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
Expand Down Expand Up @@ -487,7 +487,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]

private def aggregate(aggregationFunc: AggregationFunction[T]): DataStream[T] = {
val invokable =
new StreamGroupedReduce[T](aggregationFunc, dataType.createSerializer(executionConfig))
new StreamGroupedReduceOperator[T](aggregationFunc, dataType.createSerializer(executionConfig))

new DataStream[T](javaStream.transform("aggregation", javaStream.getType(), invokable))
.asInstanceOf[DataStream[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator;

import org.apache.flink.shaded.guava18.com.google.common.collect.EvictingQueue;

Expand All @@ -45,7 +45,7 @@
* of {@link AbstractUdfStreamOperator} is correctly restored in case of recovery from
* a failure.
*
* <p>The topology currently tests the proper behaviour of the {@link StreamGroupedReduce} operator.
* <p>The topology currently tests the proper behaviour of the {@link StreamGroupedReduceOperator} operator.
*/
@SuppressWarnings("serial")
public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {
Expand Down

0 comments on commit 5468fab

Please sign in to comment.