Skip to content

Commit

Permalink
[FLINK-3258] [runtime, streaming-java, tests] Move registerInputOutpu…
Browse files Browse the repository at this point in the history
…t code to invoke and remove registerInputOutput

This closes apache#1538
  • Loading branch information
uce committed Jan 27, 2016
1 parent 92a2544 commit f681d9b
Show file tree
Hide file tree
Showing 29 changed files with 220 additions and 478 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ public void run() {
// --------------------------------------------------------------------------------------------

public static class TestInvokable extends AbstractInvokable {
@Override
public void registerInputOutput() {}

@Override
public void invoke() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,12 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen

private final AtomicBoolean terminated = new AtomicBoolean(false);


// --------------------------------------------------------------------------------------------

@Override
public void registerInputOutput() {
public void invoke() throws Exception {
this.headEventReader = new MutableRecordReader<IntValue>(getEnvironment().getInputGate(0));
}

@Override
public void invoke() throws Exception {
TaskConfig taskConfig = new TaskConfig(getTaskConfiguration());

// store all aggregators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,33 @@
import org.slf4j.LoggerFactory;

/**
* This is the abstract base class for every task that can be executed by a TaskManager.
* Concrete tasks like the stream vertices of the batch tasks
* (see {@link BatchTask}) inherit from this class.
* This is the abstract base class for every task that can be executed by a
* TaskManager. Concrete tasks like the vertices of batch jobs (see
* {@link BatchTask} inherit from this class.
*
* The TaskManager invokes the methods {@link #registerInputOutput()} and {@link #invoke()} in
* this order when executing a task. The first method is responsible for setting up input and
* output stream readers and writers, the second method contains the task's core operation.
* <p>The TaskManager invokes the {@link #invoke()} method when executing a
* task. All operations of the task happen in this method (setting up input
* output stream readers and writers as well as the task's core operation).
*/
public abstract class AbstractInvokable {

private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokable.class);


/** The environment assigned to this invokable. */
private Environment environment;

/** The execution config, cached from the deserialization from the JobConfiguration */
private ExecutionConfig executionConfig;


/**
* Must be overwritten by the concrete task to instantiate the required record reader and record writer.
*/
public abstract void registerInputOutput() throws Exception;

/**
* Must be overwritten by the concrete task. This method is called by the task manager
* when the actual execution of the task starts.
* Starts the execution.
*
* <p>Must be overwritten by the concrete task implementation. This method
* is called by the task manager when the actual execution of the task
* starts.
*
* <p>All resources should be cleaned up when the method returns. Make sure
* to guard the code with <code>try-finally</code> blocks where necessary.
*
* @throws Exception
* Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.
Expand Down Expand Up @@ -89,7 +88,6 @@ public ClassLoader getUserCodeClassLoader() {
return getEnvironment().getUserClassLoader();
}


/**
* Returns the current number of subtasks the respective task is split into.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,14 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
// Task Interface
// --------------------------------------------------------------------------------------------


/**
* Initialization method. Runs in the execution graph setup phase in the JobManager
* and as a setup method on the TaskManager.
* The main work method.
*/
@Override
public void registerInputOutput() throws Exception {
public void invoke() throws Exception {
// --------------------------------------------------------------------
// Initialize
// --------------------------------------------------------------------
if (LOG.isDebugEnabled()) {
LOG.debug(formatLogString("Start registering input and output."));
}
Expand All @@ -247,21 +248,14 @@ public void registerInputOutput() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(formatLogString("Finished registering input and output."));
}
}


/**
* The main work method.
*/
@Override
public void invoke() throws Exception {

// --------------------------------------------------------------------
// Invoke
// --------------------------------------------------------------------
if (LOG.isDebugEnabled()) {
LOG.debug(formatLogString("Start task code."));
}

Environment env = getEnvironment();

this.runtimeUdfContext = createRuntimeContext();

// whatever happens in this scope, make sure that the local strategies are cleaned up!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ public class DataSinkTask<IT> extends AbstractInvokable {
private volatile boolean cleanupCalled;

@Override
public void registerInputOutput() {

public void invoke() throws Exception {
// --------------------------------------------------------------------
// Initialize
// --------------------------------------------------------------------
LOG.debug(getLogString("Start registering input and output"));

// initialize OutputFormat
initOutputFormat();

// initialize input readers
try {
initInputReaders();
Expand All @@ -99,12 +101,10 @@ public void registerInputOutput() {
}

LOG.debug(getLogString("Finished registering input and output"));
}


@Override
public void invoke() throws Exception
{
// --------------------------------------------------------------------
// Invoke
// --------------------------------------------------------------------
LOG.debug(getLogString("Starting data sink operator"));

if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,26 @@ public class DataSourceTask<OT> extends AbstractInvokable {
private volatile boolean taskCanceled = false;

@Override
public void registerInputOutput() {
public void invoke() throws Exception {
// --------------------------------------------------------------------
// Initialize
// --------------------------------------------------------------------
initInputFormat();

LOG.debug(getLogString("Start registering input and output"));

try {
initOutputs(getUserCodeClassLoader());
} catch (Exception ex) {
throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
ex.getMessage(), ex);
throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
ex.getMessage(), ex);
}

LOG.debug(getLogString("Finished registering input and output"));
}


@Override
public void invoke() throws Exception {

// --------------------------------------------------------------------
// Invoke
// --------------------------------------------------------------------
LOG.debug(getLogString("Starting data source operator"));

if(RichInputFormat.class.isAssignableFrom(this.format.getClass())){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,18 +505,12 @@ else if (current == ExecutionState.CANCELING) {

// let the task code create its readers and writers
invokable.setEnvironment(env);
try {
invokable.registerInputOutput();
}
catch (Exception e) {
throw new Exception("Call to registerInputOutput() of invokable failed", e);
}

// the very last thing before the actual execution starts running is to inject
// the state into the task. the state is non-empty if this is an execution
// of a task that failed but had backuped state from a checkpoint

// get our private reference onto the stack (be safe against concurrent changes)
// get our private reference onto the stack (be safe against concurrent changes)
SerializedValue<StateHandle<?>> operatorState = this.operatorState;
long recoveryTs = this.recoveryTs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,6 @@ public static class BlockingTask extends AbstractInvokable {
private volatile static int HasBlockedExecution = 0;
private static Object waitLock = new Object();

@Override
public void registerInputOutput() throws Exception {
// Nothing to do
}

@Override
public void invoke() throws Exception {
if (BlockExecution > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ public void testPartialConsumePipelinedResultReceiver() throws Exception {
*/
public static class SlowBufferSender extends AbstractInvokable {

@Override
public void registerInputOutput() {
// Nothing to do
}

@Override
public void invoke() throws Exception {
final ResultPartitionWriter writer = getEnvironment().getWriter(0);
Expand All @@ -130,11 +125,6 @@ public void invoke() throws Exception {
*/
public static class SingleBufferReceiver extends AbstractInvokable {

@Override
public void registerInputOutput() {
// Nothing to do
}

@Override
public void invoke() throws Exception {
final BufferReader reader = new BufferReader(getEnvironment().getInputGate(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,21 @@
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.util.event.EventListener;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkElementIndex;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;

Expand All @@ -51,8 +58,37 @@ public TestSingleInputGate(int numberOfInputChannels) {
public TestSingleInputGate(int numberOfInputChannels, boolean initialize) {
checkArgument(numberOfInputChannels >= 1);

this.inputGate = spy(new SingleInputGate(
"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class)));
SingleInputGate realGate = new SingleInputGate(
"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class));

this.inputGate = spy(realGate);

// Notify about late registrations (added for DataSinkTaskTest#testUnionDataSinkTask).
// After merging registerInputOutput and invoke, we have to make sure that the test
// notifcations happen at the expected time. In real programs, this is guaranteed by
// the instantiation and request partition life cycle.
try {
Field f = realGate.getClass().getDeclaredField("inputChannelsWithData");
f.setAccessible(true);
final BlockingQueue<InputChannel> notifications = (BlockingQueue<InputChannel>) f.get(realGate);

doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
invocation.callRealMethod();

if (!notifications.isEmpty()) {
EventListener<InputGate> listener = (EventListener<InputGate>) invocation.getArguments()[0];
listener.onEvent(inputGate);
}

return null;
}
}).when(inputGate).registerListener(any(EventListener.class));
}
catch (Exception e) {
throw new RuntimeException(e);
}

this.inputChannels = new TestInputChannel[numberOfInputChannels];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,11 @@ public static class RoundRobinSubtaskIndexSender extends AbstractInvokable {

public final static String CONFIG_KEY = "number-of-times-to-send";

private RecordWriter<IntValue> writer;

private int numberOfTimesToSend;

@Override
public void registerInputOutput() {
writer = new RecordWriter<IntValue>(getEnvironment().getWriter(0));
numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
}

@Override
public void invoke() throws Exception {
RecordWriter<IntValue> writer = new RecordWriter<>(getEnvironment().getWriter(0));
final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);

final IntValue subtaskIndex = new IntValue(
getEnvironment().getTaskInfo().getIndexOfThisSubtask());

Expand All @@ -154,26 +147,16 @@ public static class SubtaskIndexReceiver extends AbstractInvokable {

public final static String CONFIG_KEY = "number-of-indexes-to-receive";

private RecordReader<IntValue> reader;

private int numberOfSubtaskIndexesToReceive;

/** Each set bit position corresponds to a received subtask index */
private BitSet receivedSubtaskIndexes;

@Override
public void registerInputOutput() {
reader = new RecordReader<IntValue>(
public void invoke() throws Exception {
RecordReader<IntValue> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
IntValue.class);

numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive);
}

@Override
public void invoke() throws Exception {
try {
final int numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
final BitSet receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive);

IntValue record;

int numberOfReceivedSubtaskIndexes = 0;
Expand Down
Loading

0 comments on commit f681d9b

Please sign in to comment.