Skip to content

Commit

Permalink
[FLINK-7748][network] Properly use the TaskEventDispatcher for subscr…
Browse files Browse the repository at this point in the history
…ibing to events

Previously, the ResultPartitionWriter implemented the EventListener interface
and was used for event registration, although event publishing was already
handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for
both, event registration and publishing.

It also adds the TaskEventDispatcher to the Environment information for a task
to be able to work with it (only IterationHeadTask so far).

This closes apache#4761.
  • Loading branch information
Nico Kruber authored and zentol committed Dec 12, 2017
1 parent c5efb1f commit 175e1b3
Show file tree
Hide file tree
Showing 20 changed files with 358 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down Expand Up @@ -209,4 +210,6 @@ public interface Environment {
InputGate getInputGate(int index);

InputGate[] getAllInputGates();

TaskEventDispatcher getTaskEventDispatcher();
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void registerTask(Task task) throws IOException {
}

// Register writer with task event dispatcher
taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
taskEventDispatcher.registerPartition(writer.getPartitionId());
}

// Setup the buffer pool for each buffer reader
Expand Down Expand Up @@ -266,7 +266,7 @@ public void unregisterTask(Task task) {
ResultPartitionWriter[] writers = task.getAllWriters();
if (writers != null) {
for (ResultPartitionWriter writer : writers) {
taskEventDispatcher.unregisterWriter(writer);
taskEventDispatcher.unregisterPartition(writer.getPartitionId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,70 +19,126 @@
package org.apache.flink.runtime.io.network;

import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.TaskEventHandler;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.util.event.EventListener;

import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The task event dispatcher dispatches events flowing backwards from a consuming task to the task
* producing the consumed result.
*
* <p> Backwards events only work for tasks, which produce pipelined results, where both the
* <p>Backwards events only work for tasks, which produce pipelined results, where both the
* producing and consuming task are running at the same time.
*/
public class TaskEventDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(TaskEventDispatcher.class);

private final Map<ResultPartitionID, ResultPartitionWriter> registeredWriters = Maps.newHashMap();
private final Map<ResultPartitionID, TaskEventHandler> registeredHandlers = new HashMap<>();

public void registerWriterForIncomingTaskEvents(ResultPartitionID partitionId, ResultPartitionWriter writer) {
synchronized (registeredWriters) {
if (registeredWriters.put(partitionId, writer) != null) {
throw new IllegalStateException("Already registered at task event dispatcher.");
/**
* Registers the given partition for incoming task events allowing calls to {@link
* #subscribeToEvent(ResultPartitionID, EventListener, Class)}.
*
* @param partitionId
* the partition ID
*/
public void registerPartition(ResultPartitionID partitionId) {
checkNotNull(partitionId);

synchronized (registeredHandlers) {
LOG.debug("registering {}", partitionId);
if (registeredHandlers.put(partitionId, new TaskEventHandler()) != null) {
throw new IllegalStateException(
"Partition " + partitionId + " already registered at task event dispatcher.");
}
}
}

public void unregisterWriter(ResultPartitionWriter writer) {
synchronized (registeredWriters) {
registeredWriters.remove(writer.getPartitionId());
/**
* Removes the given partition from listening to incoming task events, thus forbidding calls to
* {@link #subscribeToEvent(ResultPartitionID, EventListener, Class)}.
*
* @param partitionId
* the partition ID
*/
public void unregisterPartition(ResultPartitionID partitionId) {
checkNotNull(partitionId);

synchronized (registeredHandlers) {
LOG.debug("unregistering {}", partitionId);
// NOTE: tolerate un-registration of non-registered task (unregister is always called
// in the cleanup phase of a task even if it never came to the registration - see
// Task.java)
registeredHandlers.remove(partitionId);
}
}

/**
* Publishes the event to the registered {@link ResultPartitionWriter} instances.
* <p>
* This method is either called directly from a {@link LocalInputChannel} or the network I/O
* Subscribes a listener to this dispatcher for events on a partition.
*
* @param partitionId
* ID of the partition to subscribe for (must be registered via {@link
* #registerPartition(ResultPartitionID)} first!)
* @param eventListener
* the event listener to subscribe
* @param eventType
* event type to subscribe to
*/
public void subscribeToEvent(
ResultPartitionID partitionId,
EventListener<TaskEvent> eventListener,
Class<? extends TaskEvent> eventType) {
checkNotNull(partitionId);
checkNotNull(eventListener);
checkNotNull(eventType);

TaskEventHandler taskEventHandler = registeredHandlers.get(partitionId);
if (taskEventHandler == null) {
throw new IllegalStateException(
"Partition " + partitionId + " not registered at task event dispatcher.");
}
taskEventHandler.subscribe(eventListener, eventType);
}

/**
* Publishes the event to the registered {@link EventListener} instances.
*
* <p>This method is either called directly from a {@link LocalInputChannel} or the network I/O
* thread on behalf of a {@link RemoteInputChannel}.
*
* @return whether the event was published to a registered event handler (initiated via {@link
* #registerPartition(ResultPartitionID)}) or not
*/
public boolean publish(ResultPartitionID partitionId, TaskEvent event) {
EventListener<TaskEvent> listener = registeredWriters.get(partitionId);
checkNotNull(partitionId);
checkNotNull(event);

if (listener != null) {
listener.onEvent(event);
TaskEventHandler taskEventHandler = registeredHandlers.get(partitionId);

if (taskEventHandler != null) {
taskEventHandler.publish(event);
return true;
}

return false;
}

public void clearAll() {
synchronized (registeredWriters) {
registeredWriters.clear();
}
}

/**
* Returns the number of currently registered writers.
* Removes all registered event handlers.
*/
int getNumberOfRegisteredWriters() {
synchronized (registeredWriters) {
return registeredWriters.size();
public void clearAll() {
synchronized (registeredHandlers) {
registeredHandlers.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

package org.apache.flink.runtime.io.network.api.writer;

import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.TaskEventHandler;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.util.event.EventListener;

import java.io.IOException;

Expand All @@ -34,12 +31,10 @@
* The {@link ResultPartitionWriter} is the runtime API for producing results. It
* supports two kinds of data to be sent: buffers and events.
*/
public class ResultPartitionWriter implements EventListener<TaskEvent> {
public class ResultPartitionWriter {

private final ResultPartition partition;

private final TaskEventHandler taskEventHandler = new TaskEventHandler();

public ResultPartitionWriter(ResultPartition partition) {
this.partition = partition;
}
Expand Down Expand Up @@ -94,17 +89,4 @@ public void writeBufferToAllChannels(final Buffer eventBuffer) throws IOExceptio
eventBuffer.recycle();
}
}

// ------------------------------------------------------------------------
// Event handling
// ------------------------------------------------------------------------

public void subscribeToEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) {
taskEventHandler.subscribe(eventListener, eventType);
}

@Override
public void onEvent(TaskEvent event) {
taskEventHandler.publish(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
import org.apache.flink.runtime.iterative.concurrent.Broker;
Expand Down Expand Up @@ -223,8 +225,10 @@ private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIte

private SuperstepBarrier initSuperstepBarrier() {
SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader());
this.toSync.subscribeToEvent(barrier, AllWorkersDoneEvent.class);
this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
TaskEventDispatcher taskEventDispatcher = getEnvironment().getTaskEventDispatcher();
ResultPartitionID partitionId = toSync.getPartitionId();
taskEventDispatcher.subscribeToEvent(partitionId, barrier, AllWorkersDoneEvent.class);
taskEventDispatcher.subscribeToEvent(partitionId, barrier, TerminationEvent.class);
return barrier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down Expand Up @@ -69,6 +70,8 @@ public class RuntimeEnvironment implements Environment {

private final ResultPartitionWriter[] writers;
private final InputGate[] inputGates;

private final TaskEventDispatcher taskEventDispatcher;

private final CheckpointResponder checkpointResponder;

Expand Down Expand Up @@ -101,6 +104,7 @@ public RuntimeEnvironment(
Map<String, Future<Path>> distCacheEntries,
ResultPartitionWriter[] writers,
InputGate[] inputGates,
TaskEventDispatcher taskEventDispatcher,
CheckpointResponder checkpointResponder,
TaskManagerRuntimeInfo taskManagerInfo,
TaskMetricGroup metrics,
Expand All @@ -123,6 +127,7 @@ public RuntimeEnvironment(
this.distCacheEntries = checkNotNull(distCacheEntries);
this.writers = checkNotNull(writers);
this.inputGates = checkNotNull(inputGates);
this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
this.checkpointResponder = checkNotNull(checkpointResponder);
this.taskManagerInfo = checkNotNull(taskManagerInfo);
this.containingTask = containingTask;
Expand Down Expand Up @@ -236,6 +241,11 @@ public InputGate[] getAllInputGates() {
return inputGates;
}

@Override
public TaskEventDispatcher getTaskEventDispatcher() {
return taskEventDispatcher;
}

@Override
public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
acknowledgeCheckpoint(checkpointId, checkpointMetrics, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,12 +667,28 @@ else if (current == ExecutionState.CANCELING) {
.createKvStateTaskRegistry(jobId, getJobVertexId());

Environment env = new RuntimeEnvironment(
jobId, vertexId, executionId, executionConfig, taskInfo,
jobConfiguration, taskConfiguration, userCodeClassLoader,
memoryManager, ioManager, broadcastVariableManager,
accumulatorRegistry, kvStateRegistry, inputSplitProvider,
distributedCacheEntries, writers, inputGates,
checkpointResponder, taskManagerConfig, metrics, this);
jobId,
vertexId,
executionId,
executionConfig,
taskInfo,
jobConfiguration,
taskConfiguration,
userCodeClassLoader,
memoryManager,
ioManager,
broadcastVariableManager,
accumulatorRegistry,
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
writers,
inputGates,
network.getTaskEventDispatcher(),
checkpointResponder,
taskManagerConfig,
metrics,
this);

// let the task code create its readers and writers
invokable.setEnvironment(env);
Expand Down
Loading

0 comments on commit 175e1b3

Please sign in to comment.