Skip to content

Commit

Permalink
[FLINK-3669] Timer coalescing across keys and cleanup of unused trigg…
Browse files Browse the repository at this point in the history
…er tasks

Per timestamp only one TriggerTask is registered at
the runtime context. When the first timer is registered a new TriggerTask
is sheduled. When no timer is registered anymore for a specific timestamp
the corresponding trigger task is canceled and hence removed.

The ScheduledFutures to cancel trigger tasks are not checkpointed. So
cleanup of trigger tasks will not work after a failure.
  • Loading branch information
knaufk authored and aljoscha committed May 3, 2016
1 parent 47faa90 commit e7586c3
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -187,14 +188,14 @@ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> statePro
}

@Override
public void registerTimer(final long time, final Triggerable target) {
public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) {
if (timer == null) {
timer = Executors.newSingleThreadScheduledExecutor();
}

final long delay = Math.max(time - System.currentTimeMillis(), 0);

timer.schedule(new Runnable() {
return timer.schedule(new Runnable() {
@Override
public void run() {
synchronized (checkpointLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.concurrent.ScheduledFuture;

/**
* Base class for all stream operators. Operators that contain a user function should extend the class
Expand Down Expand Up @@ -246,8 +247,8 @@ public AbstractStateBackend getStateBackend() {
* @param time The absolute time in milliseconds.
* @param target The target to be triggered.
*/
protected void registerTimer(long time, Triggerable target) {
container.registerTimer(time, target);
protected ScheduledFuture<?> registerTimer(long time, Triggerable target) {
return container.registerTimer(time, target);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -88,8 +89,8 @@ public InputSplitProvider getInputSplitProvider() {
* @param time The absolute time in milliseconds.
* @param target The target to be triggered.
*/
public void registerTimer(long time, Triggerable target) {
operator.registerTimer(time, target);
public ScheduledFuture<?> registerTimer(long time, Triggerable target) {
return operator.registerTimer(time, target);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.streaming.runtime.operators.windowing;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AppendingState;
Expand All @@ -39,6 +41,7 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
Expand All @@ -65,6 +68,7 @@
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -134,6 +138,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
protected transient TimestampedCollector<OUT> timestampedCollector;

protected transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;

/**
* To keep track of the current watermark so that we can immediately fire if a trigger
* registers an event time callback for a timestamp that lies in the past.
Expand All @@ -149,8 +155,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
/**
* Processing time timers that are currently in-flight.
*/
protected transient Set<Timer<K, W>> processingTimeTimers;
protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue;
protected transient Set<Timer<K, W>> processingTimeTimers;
protected transient Multiset<Long> processingTimeTimerTimestamps;

/**
* Current waiting watermark callbacks.
Expand Down Expand Up @@ -213,9 +220,13 @@ public final void open() throws Exception {
}
if (processingTimeTimers == null) {
processingTimeTimers = new HashSet<>();
processingTimeTimerTimestamps = HashMultiset.create();
processingTimeTimersQueue = new PriorityQueue<>(100);
}

//ScheduledFutures are not checkpointed
processingTimeTimerFutures = new HashMap<>();

context = new Context(null, null);

if (windowAssigner instanceof MergingWindowAssigner) {
Expand Down Expand Up @@ -424,6 +435,10 @@ private void processTriggersFor(Watermark mark) throws Exception {
public final void trigger(long time) throws Exception {
boolean fire;

//Remove information about the triggering task
processingTimeTimerFutures.remove(time);
processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));

do {
Timer<K, W> timer = processingTimeTimersQueue.peek();
if (timer != null && timer.timestamp <= time) {
Expand Down Expand Up @@ -525,9 +540,14 @@ public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescri
@Override
public void registerProcessingTimeTimer(long time) {
Timer<K, W> timer = new Timer<>(time, key, window);
// make sure we only put one timer per key into the queue
if (processingTimeTimers.add(timer)) {
processingTimeTimersQueue.add(timer);
getRuntimeContext().registerTimer(time, WindowOperator.this);
//If this is the first timer added for this timestamp register a TriggerTask
if (processingTimeTimerTimestamps.add(time, 1) == 0) {
ScheduledFuture<?> scheduledFuture= getRuntimeContext().registerTimer(time, WindowOperator.this);
processingTimeTimerFutures.put(time, scheduledFuture);
}
}
}

Expand All @@ -542,15 +562,25 @@ public void registerEventTimeTimer(long time) {
// immediately schedule a trigger, so that we don't wait for the next
// watermark update to fire the watermark trigger
getRuntimeContext().registerTimer(System.currentTimeMillis(), WindowOperator.this);
//No need to put it in processingTimeTimerFutures as this timer is never removed
}
}

@Override
public void deleteProcessingTimeTimer(long time) {
Timer<K, W> timer = new Timer<>(time, key, window);

if (processingTimeTimers.remove(timer)) {
processingTimeTimersQueue.remove(timer);
}

//If there are no timers left for this timestamp, remove it from queue and cancel TriggerTask
if (processingTimeTimerTimestamps.remove(time,1) == 1) {
ScheduledFuture<?> triggerTaskFuture = processingTimeTimerFutures.remove(timer.timestamp);
if (triggerTaskFuture != null && !triggerTaskFuture.isDone()) {
triggerTaskFuture.cancel(false);
}
}
}

@Override
Expand Down Expand Up @@ -592,6 +622,7 @@ public String toString() {
}
}


/**
* Internal class for keeping track of in-flight timers.
*/
Expand Down Expand Up @@ -670,19 +701,7 @@ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp)
AbstractStateBackend.CheckpointStateOutputView out =
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);

out.writeInt(watermarkTimersQueue.size());
for (Timer<K, W> timer : watermarkTimersQueue) {
keySerializer.serialize(timer.key, out);
windowSerializer.serialize(timer.window, out);
out.writeLong(timer.timestamp);
}

out.writeInt(processingTimeTimers.size());
for (Timer<K, W> timer : processingTimeTimersQueue) {
keySerializer.serialize(timer.key, out);
windowSerializer.serialize(timer.window, out);
out.writeLong(timer.timestamp);
}
snapshotTimers(out);

taskState.setOperatorState(out.closeAndGetHandle());

Expand All @@ -699,6 +718,10 @@ public void restoreState(StreamTaskState taskState, long recoveryTimestamp) thro
StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
DataInputView in = inputState.getState(userClassloader);

restoreTimers(in);
}

private void restoreTimers(DataInputView in ) throws IOException {
int numWatermarkTimers = in.readInt();
watermarkTimers = new HashSet<>(numWatermarkTimers);
watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
Expand All @@ -712,15 +735,45 @@ public void restoreState(StreamTaskState taskState, long recoveryTimestamp) thro
}

int numProcessingTimeTimers = in.readInt();
processingTimeTimers = new HashSet<>(numProcessingTimeTimers);
processingTimeTimersQueue = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1));
processingTimeTimers = new HashSet<>();
for (int i = 0; i < numProcessingTimeTimers; i++) {
K key = keySerializer.deserialize(in);
W window = windowSerializer.deserialize(in);
long timestamp = in.readLong();
Timer<K, W> timer = new Timer<>(timestamp, key, window);
processingTimeTimers.add(timer);
processingTimeTimersQueue.add(timer);
processingTimeTimers.add(timer);
}

int numProcessingTimeTimerTimestamp = in.readInt();
processingTimeTimerTimestamps = HashMultiset.create();
for (int i = 0; i< numProcessingTimeTimerTimestamp; i++) {
long timestamp = in.readLong();
int count = in.readInt();
processingTimeTimerTimestamps.add(timestamp, count);
}
}

private void snapshotTimers(DataOutputView out) throws IOException {
out.writeInt(watermarkTimersQueue.size());
for (Timer<K, W> timer : watermarkTimersQueue) {
keySerializer.serialize(timer.key, out);
windowSerializer.serialize(timer.window, out);
out.writeLong(timer.timestamp);
}

out.writeInt(processingTimeTimers.size());
for (Timer<K,W> timer : processingTimeTimers) {
keySerializer.serialize(timer.key, out);
windowSerializer.serialize(timer.window, out);
out.writeLong(timer.timestamp);
}

out.writeInt(processingTimeTimerTimestamps.entrySet().size());
for (Multiset.Entry<Long> timerTimestampCounts: processingTimeTimerTimestamps.entrySet()) {
out.writeLong(timerTimestampCounts.getElement());
out.writeInt(timerTimestampCounts.getCount());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@

package org.apache.flink.streaming.runtime.tasks;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand All @@ -41,23 +31,32 @@
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Base class for all streaming tasks. A task is the unit of local processing that is deployed
* and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
Expand Down Expand Up @@ -129,7 +128,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
private ClassLoader userClassLoader;

/** The executor service that schedules and calls the triggers of this task*/
private ScheduledExecutorService timerService;
private ScheduledThreadPoolExecutor timerService;

/** The map of user-defined accumulators of this task */
private Map<String, Accumulator<?, ?>> accumulatorMap;
Expand Down Expand Up @@ -191,8 +190,9 @@ public final void invoke() throws Exception {
headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
}

timerService = Executors.newSingleThreadScheduledExecutor(
new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
timerService =new ScheduledThreadPoolExecutor(1, new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
// allow trigger tasks to be removed if all timers for that timestamp are removed by user
timerService.setRemoveOnCancelPolicy(true);

// task specific initialization
init();
Expand Down Expand Up @@ -663,13 +663,13 @@ public AbstractStateBackend createStateBackend(String operatorIdentifier, TypeSe
/**
* Registers a timer.
*/
public void registerTimer(final long timestamp, final Triggerable target) {
public ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target) {
long delay = Math.max(timestamp - System.currentTimeMillis(), 0);

timerService.schedule(
new TriggerTask(this, lock, target, timestamp),
delay,
TimeUnit.MILLISECONDS);
return timerService.schedule(
new TriggerTask(this, lock, target, timestamp),
delay,
TimeUnit.MILLISECONDS);
}

/**
Expand Down
Loading

0 comments on commit e7586c3

Please sign in to comment.