Skip to content

Commit

Permalink
[FLINK-3688] WindowOperator.trigger() does not emit Watermark anymore
Browse files Browse the repository at this point in the history
Conflicts:
	flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
  • Loading branch information
knaufk authored and aljoscha committed Apr 8, 2016
1 parent 9f52422 commit a234719
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,14 @@ protected void processTriggerResult(TriggerResult triggerResult, K key, W window

@Override
public final void processWatermark(Watermark mark) throws Exception {
processTriggersFor(mark);

output.emitWatermark(mark);

this.currentWatermark = mark.getTimestamp();
}

private void processTriggersFor(Watermark mark) throws Exception {
boolean fire;

do {
Expand All @@ -360,10 +368,6 @@ public final void processWatermark(Watermark mark) throws Exception {
fire = false;
}
} while (fire);

output.emitWatermark(mark);

this.currentWatermark = mark.getTimestamp();
}

@Override
Expand Down Expand Up @@ -391,7 +395,7 @@ public final void trigger(long time) throws Exception {
// Also check any watermark timers. We might have some in here since
// Context.registerEventTimeTimer sets a trigger if an event-time trigger is registered
// that is already behind the watermark.
processWatermark(new Watermark(currentWatermark));
processTriggersFor(new Watermark(currentWatermark));
}

/**
Expand Down

0 comments on commit a234719

Please sign in to comment.