Skip to content

Commit

Permalink
[FLINK-7660] Support sideOutput in ProcessAllWindowFunction
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenli86 authored and aljoscha committed Oct 12, 2017
1 parent 89de78c commit 39682c4
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,15 @@ public void process(final Context context, Iterable<T> values, Collector<R> out)
}

this.ctx.window = context.window();
this.ctx.windowState = context.windowState();
this.ctx.globalState = context.globalState();
this.ctx.context = context;

windowFunction.process(ctx, Collections.singletonList(result), out);
}

@Override
public void clear(final Context context) throws Exception {
this.ctx.window = context.window();
this.ctx.windowState = context.windowState();
this.ctx.globalState = context.globalState();
this.ctx.context = context;
windowFunction.clear(ctx);
}

Expand All @@ -136,5 +134,4 @@ public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig execut

serializedInitialValue = baos.toByteArray();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.OutputTag;

/**
* Internal reusable context wrapper.
Expand All @@ -34,8 +35,7 @@ public class InternalProcessApplyAllWindowContext<IN, OUT, W extends Window>
extends ProcessAllWindowFunction<IN, OUT, W>.Context {

W window;
KeyedStateStore windowState;
KeyedStateStore globalState;
ProcessAllWindowFunction.Context context;

InternalProcessApplyAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) {
function.super();
Expand All @@ -48,11 +48,16 @@ public W window() {

@Override
public KeyedStateStore windowState() {
return windowState;
return context.windowState();
}

@Override
public KeyedStateStore globalState() {
return globalState;
return context.globalState();
}

@Override
public <X> void output(OutputTag<X> outputTag, X value) {
context.output(outputTag, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
* Base abstract class for functions that are evaluated over non-keyed windows using a context
Expand Down Expand Up @@ -77,5 +78,13 @@ public abstract class Context {
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();

/**
* Emits a record to the side output identified by the {@link OutputTag}.
*
* @param outputTag the {@code OutputTag} that identifies the side output to emit to.
* @param value The record to emit.
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,15 @@ public void process(final Context context, Iterable<T> input, Collector<R> out)
}

this.ctx.window = context.window();
this.ctx.windowState = context.windowState();
this.ctx.globalState = context.globalState();
this.ctx.context = context;

windowFunction.process(ctx, Collections.singletonList(curr), out);
}

@Override
public void clear(final Context context) throws Exception {
this.ctx.window = context.window();
this.ctx.windowState = context.windowState();
this.ctx.globalState = context.globalState();
this.ctx.context = context;

windowFunction.clear(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ public KeyedStateStore globalState() {
return WindowOperator.this.getKeyedStateStore();
}

@Override
public <X> void output(OutputTag<X> outputTag, X value) {
if (outputTag == null) {
throw new IllegalArgumentException("OutputTag must not be null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.OutputTag;

/**
* Internal reusable context wrapper.
Expand Down Expand Up @@ -55,4 +56,9 @@ public KeyedStateStore windowState() {
public KeyedStateStore globalState() {
return internalContext.globalState();
}

@Override
public <X> void output(OutputTag<X> outputTag, X value) {
internalContext.output(outputTag, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala.function
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.functions.AbstractRichFunction
import org.apache.flink.api.common.state.KeyedStateStore
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector

Expand Down Expand Up @@ -73,6 +74,11 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window]
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore

/**
* Emits a record to the side output identified by the [[OutputTag]].
*/
def output[X](outputTag: OutputTag[X], value: X)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
override def windowState = context.windowState()

override def globalState = context.globalState()

override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag, value)
}
func.process(ctx, elements.asScala, out)
}
Expand All @@ -138,6 +140,8 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
override def windowState = context.windowState()

override def globalState = context.globalState()

override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag, value)
}
func.clear(ctx)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,51 @@ class SideOutputITCase extends StreamingMultipleProgramsTestBase {
assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"),
sideOutputResultSink.getResult)
}

/**
* Test ProcessAllWindowFunction side output.
*/
@Test
def testProcessAllWindowFunctionSideOutput() {
val resultSink = new TestListResultSink[String]
val sideOutputResultSink = new TestListResultSink[String]

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4))


val sideOutputTag = OutputTag[String]("side")

val windowOperator = dataStream
.assignTimestampsAndWatermarks(new TestAssigner)
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1)))
.process(new ProcessAllWindowFunction[(String, Int), String, TimeWindow] {
override def process(
context: Context,
elements: Iterable[(String, Int)],
out: Collector[String]): Unit = {
for (in <- elements) {
out.collect(in._1)
context.output(sideOutputTag, "sideout-" + in._1)
}
}
})

windowOperator
.getSideOutput(sideOutputTag)
.addSink(sideOutputResultSink)

windowOperator.addSink(resultSink)

env.execute()

assertEquals(util.Arrays.asList("1", "2", "5"), resultSink.getResult)
assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"),
sideOutputResultSink.getResult)
}
}

class TestAssigner extends AssignerWithPunctuatedWatermarks[(String, Int)] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand Down Expand Up @@ -582,4 +583,40 @@ public void process(Integer integer, Context context, Iterable<Integer> elements
assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
}

@Test
public void testProcessAllWindowFunctionSideOutput() throws Exception {
TestListResultSink<Integer> resultSink = new TestListResultSink<>();
TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(1);
see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Integer> dataStream = see.fromCollection(elements);

OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};

SingleOutputStreamOperator<Integer> windowOperator = dataStream
.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.timeWindowAll(Time.milliseconds(1), Time.milliseconds(1))
.process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
private static final long serialVersionUID = 1L;

@Override
public void process(Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
for (Integer e : elements) {
out.collect(e);
context.output(sideOutputTag, "sideout-" + String.valueOf(e));
}
}
});

windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
windowOperator.addSink(resultSink);
see.execute();

assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
}
}

0 comments on commit 39682c4

Please sign in to comment.