Skip to content

Commit

Permalink
[FLINK-5982] [runtime] Refactor AbstractInvokable and StatefulTask to…
Browse files Browse the repository at this point in the history
… accept Environment and State in the constructor.

This is the first steo towards implementing an RAII pattern for all task runtime classes.

This closes apache#3633
  • Loading branch information
tony810430 authored and StephanEwen committed Jan 9, 2018
1 parent 4c3f607 commit 6033de0
Show file tree
Hide file tree
Showing 79 changed files with 1,196 additions and 617 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
@Test
public void testFullyAsyncSnapshot() throws Exception {

final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();

final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();

testHarness.configureForKeyedStream(new KeySelector<String, String>() {
Expand Down Expand Up @@ -179,6 +179,8 @@ public void acknowledgeCheckpoint(

testHarness.invoke(mockEnv);

final OneInputStreamTask<String, String> task = testHarness.getTask();

// wait for the task to be running
for (Field field: StreamTask.class.getDeclaredFields()) {
if (field.getName().equals("isRunning")) {
Expand Down Expand Up @@ -213,13 +215,13 @@ public void acknowledgeCheckpoint(

/**
* This tests ensures that canceling of asynchronous snapshots works as expected and does not block.
* @throws Exception
*/
@Test
public void testCancelFullyAsyncCheckpoints() throws Exception {
final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.setupOutputForSingletonOperatorChain();

testHarness.configureForKeyedStream(new KeySelector<String, String>() {
Expand Down Expand Up @@ -278,6 +280,8 @@ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointS

testHarness.invoke(mockEnv);

final OneInputStreamTask<String, String> task = testHarness.getTask();

// wait for the task to be running
for (Field field: StreamTask.class.getDeclaredFields()) {
if (field.getName().equals("isRunning")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down Expand Up @@ -120,24 +122,20 @@ public void testJobManagerJMXMetricAccess() throws Exception {
* Utility to block/unblock a task.
*/
public static class BlockingInvokable extends AbstractInvokable {
private static boolean blocking = true;
private static final Object lock = new Object();

private static final OneShotLatch LATCH = new OneShotLatch();

public BlockingInvokable(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
while (blocking) {
synchronized (lock) {
lock.wait();
}
}
LATCH.await();
}

public static void unblock() {
blocking = false;

synchronized (lock) {
lock.notifyAll();
}
LATCH.trigger();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

import static org.apache.flink.util.Preconditions.checkNotNull;

import java.io.Serializable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;

/**
* Options for performing the checkpoint.
*
* <p>The {@link CheckpointProperties} are related and cover properties that
* are only relevant at the {@link CheckpointCoordinator}. These options are
* relevant at the {@link StatefulTask} instances running on task managers.
* relevant at the {@link AbstractInvokable} instances running on task managers.
*/
public class CheckpointOptions implements Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
*/
public abstract class AbstractIterativeTask<S extends Function, OT> extends BatchTask<S, OT>
implements Terminable {

private static final Logger log = LoggerFactory.getLogger(AbstractIterativeTask.class);

protected LongSumAggregator worksetAggregator;
Expand All @@ -86,6 +87,17 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc

private volatile boolean terminationRequested;

// --------------------------------------------------------------------------------------------

/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public AbstractIterativeTask(Environment environment) {
super(environment);
}

// --------------------------------------------------------------------------------------------
// Main life cycle methods that implement the iterative behavior
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
Expand Down Expand Up @@ -100,6 +101,17 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte

// --------------------------------------------------------------------------------------------

/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public IterationHeadTask(Environment environment) {
super(environment);
}

// --------------------------------------------------------------------------------------------

@Override
protected int getNumTaskInputs() {
// this task has an additional input in the workset case for the initial solution set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.iterative.task;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
Expand Down Expand Up @@ -49,6 +50,19 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI

private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;

// --------------------------------------------------------------------------------------------

/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public IterationIntermediateTask(Environment environment) {
super(environment);
}

// --------------------------------------------------------------------------------------------

@Override
protected void initialize() throws Exception {
super.initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
Expand Down Expand Up @@ -73,6 +74,17 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen

// --------------------------------------------------------------------------------------------

/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public IterationSynchronizationSinkTask(Environment environment) {
super(environment);
}

// --------------------------------------------------------------------------------------------

@Override
public void invoke() throws Exception {
this.headEventReader = new MutableRecordReader<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.iterative.task;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
Expand Down Expand Up @@ -47,6 +48,19 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative

private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;

// --------------------------------------------------------------------------------------------

/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public IterationTailTask(Environment environment) {
super(environment);
}

// --------------------------------------------------------------------------------------------

@Override
protected void initialize() throws Exception {
super.initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,53 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.BatchTask;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* This is the abstract base class for every task that can be executed by a
* TaskManager. Concrete tasks like the vertices of batch jobs (see
* {@link BatchTask} inherit from this class.
* This is the abstract base class for every task that can be executed by a TaskManager.
* Concrete tasks extend this class, for example the streaming and batch tasks.
*
* <p>The TaskManager invokes the {@link #invoke()} method when executing a
* task. All operations of the task happen in this method (setting up input
* output stream readers and writers as well as the task's core operation).
*
* <p>All classes that extend must offer a constructor {@code MyTask(Environment, TaskStateSnapshot)}.
* Tasks that are always stateless can, for convenience, also only implement the constructor
* {@code MyTask(Environment)}.
*
* <p><i>Developer note: While constructors cannot be enforced at compile time, we did not yet venture
* on the endeavor of introducing factories (it is only an internal API after all, and with Java 8,
* one can use {@code Class::new} almost like a factory lambda.</i>
*
* <p><b>NOTE:</b> There is no constructor that accepts and initial task state snapshot
* and stores it in a variable. That is on purpose, because the AbstractInvokable itself
* does not need the state snapshot (only subclasses such as StreamTask do need the state)
* and we do not want to store a reference indefinitely, thus preventing cleanup of
* the initial state structure by the Garbage Collector.
*
* <p>Any subclass that supports recoverable state and participates in
* checkpointing needs to override {@link #triggerCheckpoint(CheckpointMetaData, CheckpointOptions)},
* {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)},
* {@link #abortCheckpointOnBarrier(long, Throwable)} and {@link #notifyCheckpointComplete(long)}.
*/
public abstract class AbstractInvokable {

/** The environment assigned to this invokable. */
private Environment environment;
private final Environment environment;

/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public AbstractInvokable(Environment environment) {
this.environment = checkNotNull(environment);
}

/**
* Starts the execution.
Expand All @@ -46,7 +77,7 @@ public abstract class AbstractInvokable {
*
* <p>All resources should be cleaned up when the method returns. Make sure
* to guard the code with <code>try-finally</code> blocks where necessary.
*
*
* @throws Exception
* Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.
*/
Expand All @@ -62,16 +93,6 @@ public abstract class AbstractInvokable {
public void cancel() throws Exception {
// The default implementation does nothing.
}

/**
* Sets the environment of this task.
*
* @param environment
* the environment of this task
*/
public final void setEnvironment(Environment environment) {
this.environment = environment;
}

/**
* Returns the environment of this task.
Expand Down Expand Up @@ -133,4 +154,65 @@ public Configuration getJobConfiguration() {
public ExecutionConfig getExecutionConfig() {
return this.environment.getExecutionConfig();
}

// ------------------------------------------------------------------------
// Checkpointing Methods
// ------------------------------------------------------------------------

/**
* This method is called to trigger a checkpoint, asynchronously by the checkpoint
* coordinator.
*
* <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
* i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
* receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)}
* method.
*
* @param checkpointMetaData Meta data for about this checkpoint
* @param checkpointOptions Options for performing this checkpoint
*
* @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
*/
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
throw new UnsupportedOperationException(String.format("triggerCheckpoint not supported by %s", this.getClass().getName()));
}

/**
* This method is called when a checkpoint is triggered as a result of receiving checkpoint
* barriers on all input streams.
*
* @param checkpointMetaData Meta data for about this checkpoint
* @param checkpointOptions Options for performing this checkpoint
* @param checkpointMetrics Metrics about this checkpoint
*
* @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
*/
public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
throw new UnsupportedOperationException(String.format("triggerCheckpointOnBarrier not supported by %s", this.getClass().getName()));
}

/**
* Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
* but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
*
* <p>This requires implementing tasks to forward a
* {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
*
* @param checkpointId The ID of the checkpoint to be aborted.
* @param cause The reason why the checkpoint was aborted during alignment
*/
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
throw new UnsupportedOperationException(String.format("abortCheckpointOnBarrier not supported by %s", this.getClass().getName()));
}

/**
* Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
* the notification from all participating tasks.
*
* @param checkpointId The ID of the checkpoint that is complete..
* @throws Exception The notification method may forward its exceptions.
*/
public void notifyCheckpointComplete(long checkpointId) throws Exception {
throw new UnsupportedOperationException(String.format("notifyCheckpointComplete not supported by %s", this.getClass().getName()));
}
}
Loading

0 comments on commit 6033de0

Please sign in to comment.