Skip to content

Commit

Permalink
[FLINK-9152] Use in-class Context objects in BroadcastProcessFunction
Browse files Browse the repository at this point in the history
This brings it in line with KeyedBroadcastProcessFunction, which uses
context objects defined in KeyedBroadcastProcessFunction. The context
objects here have no added functionality but we still define them here
so that the methods don't refer to the base class implementations for
consistency.
  • Loading branch information
aljoscha committed Apr 12, 2018
1 parent 584229d commit 0838bbe
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,16 @@ public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadc
* to fail and go into recovery.
*/
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

/**
* A {@link BaseBroadcastProcessFunction.Context context} available to the broadcast side of
* a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream}.
*/
public abstract class Context extends BaseBroadcastProcessFunction.Context {}

/**
* A {@link BaseBroadcastProcessFunction.Context context} available to the non-keyed side of
* a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream} (if any).
*/
public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction.Context;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
Expand Down Expand Up @@ -113,7 +113,7 @@ public void processWatermark(Watermark mark) throws Exception {
currentWatermark = mark.getTimestamp();
}

private class ReadWriteContextImpl extends BaseBroadcastProcessFunction.Context {
private class ReadWriteContextImpl extends Context {

private final ExecutionConfig config;

Expand Down

0 comments on commit 0838bbe

Please sign in to comment.