Skip to content

Commit

Permalink
[FLINK-9152] Harmonize BroadcastProcessFunction Context names
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Apr 12, 2018
1 parent d5d6842 commit 584229d
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
*
* <p>The user has to implement two methods:
* <ol>
* <li>the {@link #processBroadcastElement(Object, KeyedContext, Collector)} which will be applied to
* <li>the {@link #processBroadcastElement(Object, Context, Collector)} which will be applied to
* each element in the broadcast side
* <li> and the {@link #processElement(Object, KeyedReadOnlyContext, Collector)} which will be applied to the
* <li> and the {@link #processElement(Object, ReadOnlyContext, Collector)} which will be applied to the
* non-broadcasted/keyed side.
* </ol>
*
Expand Down Expand Up @@ -71,15 +71,15 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
* The context is only valid during the invocation of this method, do not store it.
*
* @param value The stream element.
* @param ctx A {@link KeyedReadOnlyContext} that allows querying the timestamp of the element,
* @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
* querying the current processing/event time and iterating the broadcast state
* with <b>read-only</b> access.
* The context is only valid during the invocation of this method, do not store it.
* @param out The collector to emit resulting elements to
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
public abstract void processElement(final IN1 value, final KeyedReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;

/**
* This method is called for each element in the
Expand All @@ -102,7 +102,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
public abstract void processBroadcastElement(final IN2 value, final KeyedContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;

/**
* Called when a timer set using {@link TimerService} fires.
Expand Down Expand Up @@ -130,7 +130,7 @@ public void onTimer(final long timestamp, final OnTimerContext ctx, final Collec
* this also allows to apply a {@link KeyedStateFunction} to the (local) states of all active keys
* in the your backend.
*/
public abstract class KeyedContext extends Context {
public abstract class Context extends BaseBroadcastProcessFunction.Context {

/**
* Applies the provided {@code function} to the state
Expand All @@ -152,7 +152,7 @@ public abstract <VS, S extends State> void applyToKeyedState(
* this also allows to get a <b>read-only</b> {@link Iterable} over the elements stored in the
* broadcast state and a {@link TimerService} for querying time and registering timers.
*/
public abstract class KeyedReadOnlyContext extends ReadOnlyContext {
public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {

/**
* A {@link TimerService} for querying time and registering timers.
Expand All @@ -163,7 +163,7 @@ public abstract class KeyedReadOnlyContext extends ReadOnlyContext {
/**
* Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
*/
public abstract class OnTimerContext extends KeyedReadOnlyContext {
public abstract class OnTimerContext extends ReadOnlyContext {

/**
* The {@link TimeDomain} of the firing timer, i.e. if it is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction.ReadOnlyContext;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
Expand Down Expand Up @@ -141,7 +142,8 @@ public void onProcessingTime(InternalTimer<KS, VoidNamespace> timer) throws Exce
onTimerContext.timer = null;
}

private class ReadWriteContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedContext {
private class ReadWriteContextImpl
extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context {

private final ExecutionConfig config;

Expand Down Expand Up @@ -220,7 +222,7 @@ public <VS, S extends State> void applyToKeyedState(
}
}

private class ReadOnlyContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedReadOnlyContext {
private class ReadOnlyContextImpl extends ReadOnlyContext {

private final ExecutionConfig config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,12 +879,12 @@ public long extractTimestamp(String element, long previousElementTimestamp) {
bcStream.process(
new KeyedBroadcastProcessFunction<String, Long, String, String>() {
@Override
public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception {
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
// do nothing
}

@Override
public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
// do nothing
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private static class StatefulFunctionWithKeyedStateAccessedOnBroadcast
}

@Override
public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
// put an element in the broadcast state
ctx.applyToKeyedState(
listStateDesc,
Expand All @@ -158,7 +158,7 @@ public void process(String key, ListState<String> state) throws Exception {
}

@Override
public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
getRuntimeContext().getListState(listStateDesc).add(value);
}
}
Expand Down Expand Up @@ -216,12 +216,12 @@ private static class FunctionWithTimerOnKeyed extends KeyedBroadcastProcessFunct
}

@Override
public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
out.collect("BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
}

@Override
public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ctx.timerService().registerEventTimeTimer(timerTS);
out.collect("NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
}
Expand Down Expand Up @@ -289,12 +289,12 @@ private static class FunctionWithSideOutput extends KeyedBroadcastProcessFunctio
};

@Override
public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
ctx.output(BROADCAST_TAG, "BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
}

@Override
public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ctx.output(NON_BROADCAST_TAG, "NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
}
}
Expand Down Expand Up @@ -380,14 +380,14 @@ private static class FunctionWithBroadcastState extends KeyedBroadcastProcessFun
}

@Override
public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
// put an element in the broadcast state
final String key = value + "." + keyPostfix;
ctx.getBroadcastState(STATE_DESCRIPTOR).put(key, value);
}

@Override
public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
Iterable<Map.Entry<String, Integer>> broadcastStateIt = ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries();
Iterator<Map.Entry<String, Integer>> iter = broadcastStateIt.iterator();

Expand Down Expand Up @@ -621,15 +621,15 @@ private static class TestFunctionWithOutput extends KeyedBroadcastProcessFunctio
}

@Override
public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
// put an element in the broadcast state
for (String k : keysToRegister) {
ctx.getBroadcastState(STATE_DESCRIPTOR).put(k, value);
}
}

@Override
public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
for (Map.Entry<String, Integer> entry : ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) {
out.collect(entry.toString());
}
Expand All @@ -652,12 +652,12 @@ public void testNoKeyedStateOnBroadcastSide() throws Exception {
private final ValueStateDescriptor<String> valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO);

@Override
public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
getRuntimeContext().getState(valueState).value(); // this should fail
}

@Override
public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
// do nothing
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class TestBroadcastProcessFunction(
@throws[Exception]
override def processElement(
value: Long,
ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedReadOnlyContext,
ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#ReadOnlyContext,
out: Collector[String]): Unit = {

val currentTime = nextTimerTimestamp
Expand All @@ -121,7 +121,7 @@ class TestBroadcastProcessFunction(
@throws[Exception]
override def processBroadcastElement(
value: String,
ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedContext,
ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#Context,
out: Collector[String]): Unit = {

val key = value.split(":")(1).toLong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,15 @@ public void open(Configuration parameters) throws Exception {
}

@Override
public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
long currentTime = nextTimerTimestamp;
nextTimerTimestamp++;
ctx.timerService().registerEventTimeTimer(currentTime);
timerToExpectedKey.put(currentTime, value);
}

@Override
public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception {
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
long key = Long.parseLong(value.split(":")[1]);
ctx.getBroadcastState(descriptor).put(key, value);
}
Expand Down

0 comments on commit 584229d

Please sign in to comment.