Skip to content

Commit

Permalink
[FLINK-7720] [checkpoints] Centralize creation of backends and state …
Browse files Browse the repository at this point in the history
…related resources

This closes apache#4745.
  • Loading branch information
StefanRRichter committed Jan 22, 2018
1 parent 402a2e3 commit 517b3f8
Show file tree
Hide file tree
Showing 83 changed files with 3,256 additions and 1,182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
Expand Down Expand Up @@ -816,7 +817,12 @@ private MockRuntimeContext(

super(
new MockStreamOperator(),
new MockEnvironment("mockTask", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
new MockEnvironment(
"mockTask",
4 * MemoryManager.DEFAULT_PAGE_SIZE,
null,
16,
new TestTaskStateManager()),
Collections.emptyMap());

this.isCheckpointingEnabled = isCheckpointingEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
Expand All @@ -42,10 +43,12 @@
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand Down Expand Up @@ -139,21 +142,15 @@ public String getKey(String value) throws Exception {
final OneShotLatch delayCheckpointLatch = new OneShotLatch();
final OneShotLatch ensureCheckpointLatch = new OneShotLatch();

StreamMockEnvironment mockEnv = new StreamMockEnvironment(
testHarness.jobConfig,
testHarness.taskConfig,
testHarness.memorySize,
new MockInputSplitProvider(),
testHarness.bufferSize) {
CheckpointResponder checkpointResponderMock = new CheckpointResponder() {

@Override
public void acknowledgeCheckpoint(
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot checkpointStateHandles) {

super.acknowledgeCheckpoint(checkpointId, checkpointMetrics);

JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState) {
// block on the latch, to verify that triggerCheckpoint returns below,
// even though the async checkpoint would not finish
try {
Expand All @@ -163,7 +160,7 @@ public void acknowledgeCheckpoint(
}

boolean hasManagedKeyedState = false;
for (Map.Entry<OperatorID, OperatorSubtaskState> entry : checkpointStateHandles.getSubtaskStateMappings()) {
for (Map.Entry<OperatorID, OperatorSubtaskState> entry : subtaskState.getSubtaskStateMappings()) {
OperatorSubtaskState state = entry.getValue();
if (state != null) {
hasManagedKeyedState |= state.getManagedKeyedState() != null;
Expand All @@ -176,8 +173,30 @@ public void acknowledgeCheckpoint(
// we now know that the checkpoint went through
ensureCheckpointLatch.trigger();
}

@Override
public void declineCheckpoint(
JobID jobID, ExecutionAttemptID executionAttemptID,
long checkpointId, Throwable cause) {

}
};

JobID jobID = new JobID();
ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(0L, 0L);
TestTaskStateManager taskStateManagerTestMock = new TestTaskStateManager(
jobID,
executionAttemptID,
checkpointResponderMock);

StreamMockEnvironment mockEnv = new StreamMockEnvironment(
testHarness.jobConfig,
testHarness.taskConfig,
testHarness.memorySize,
new MockInputSplitProvider(),
testHarness.bufferSize,
taskStateManagerTestMock);

testHarness.invoke(mockEnv);

final OneInputStreamTask<String, String> task = testHarness.getTask();
Expand Down Expand Up @@ -269,12 +288,15 @@ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointS
streamConfig.setStreamOperator(new AsyncCheckpointOperator());
streamConfig.setOperatorID(new OperatorID());

TestTaskStateManager taskStateManagerTestMock = new TestTaskStateManager();

StreamMockEnvironment mockEnv = new StreamMockEnvironment(
testHarness.jobConfig,
testHarness.taskConfig,
testHarness.memorySize,
new MockInputSplitProvider(),
testHarness.bufferSize);
testHarness.bufferSize,
taskStateManagerTestMock);

blockerCheckpointStreamFactory.setBlockerLatch(new OneShotLatch());
blockerCheckpointStreamFactory.setWaiterLatch(new OneShotLatch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
Expand Down Expand Up @@ -375,11 +376,13 @@ public Map<String, Object> getComponentConfiguration() {
when(env.getMetricGroup()).thenReturn(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup());
when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());

final CloseableRegistry closeableRegistry = new CloseableRegistry();
StreamTask<?, ?> mockTask = mock(StreamTask.class);
when(mockTask.getCheckpointLock()).thenReturn(new Object());
when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration()));
when(mockTask.getEnvironment()).thenReturn(env);
when(mockTask.getExecutionConfig()).thenReturn(execConfig);
when(mockTask.getCancelables()).thenReturn(closeableRegistry);

return mockTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,44 @@

package org.apache.flink.util;

import org.apache.flink.annotation.Internal;
import javax.annotation.Nonnull;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;

/**
* Tagging interface for migration related classes.
* This interface represents an iterable that is also closeable.
*
* @param <T> type of the iterated objects.
*/
@Internal
public interface Migration {
public interface CloseableIterable<T> extends Iterable<T>, Closeable {

/**
* Empty iterator.
*/
class Empty<T> implements CloseableIterable<T> {

private Empty() {
}

@Override
public void close() throws IOException {

}

@Nonnull
@Override
public Iterator<T> iterator() {
return Collections.emptyIterator();
}
}

/**
* Returns an empty iterator.
*/
static <T> CloseableIterable<T> empty() {
return new CloseableIterable.Empty<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.io.Serializable;

/**
* This class encapsulates the data from the job manager to restore a task.
*/
public class JobManagerTaskRestore implements Serializable {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;

Expand Down Expand Up @@ -145,6 +146,8 @@ public interface Environment {

BroadcastVariableManager getBroadcastVariableManager();

TaskStateManager getTaskStateManager();

/**
* Return the registry for accumulators which are periodically sent to the job manager.
* @return the registry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private volatile Throwable failureCause; // once assigned, never changes

/** Information to restore the task on recovery, such as checkpoint id and task state snapshot */
@Nullable
private volatile JobManagerTaskRestore taskRestore;

// ------------------------ Accumulators & Metrics ------------------------
Expand Down Expand Up @@ -316,6 +317,7 @@ public boolean isFinished() {
return state.isTerminal();
}

@Nullable
public JobManagerTaskRestore getTaskRestore() {
return taskRestore;
}
Expand All @@ -326,7 +328,7 @@ public JobManagerTaskRestore getTaskRestore() {
*
* @param taskRestore information to restore the state
*/
public void setInitialState(JobManagerTaskRestore taskRestore) {
public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");
this.taskRestore = taskRestore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private StreamCompressionDecorator determineStreamCompression(ExecutionConfig ex
@Override
public void dispose() {

IOUtils.closeQuietly(this);
IOUtils.closeQuietly(cancelStreamRegistry);

if (kvStateRegistry != null) {
kvStateRegistry.unregisterAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.flink.runtime.state;


import org.apache.flink.annotation.PublicEvolving;

/**
* This interface must be implemented by functions/operations that want to receive
* a commit notification once a checkpoint has been completely acknowledged by all
* participants.
*/
@PublicEvolving
public interface CheckpointListener {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void close() throws IOException {

@Override
public void dispose() {
IOUtils.closeQuietly(this);
IOUtils.closeQuietly(closeStreamOnCancelRegistry);
registeredStates.clear();
}

Expand Down
Loading

0 comments on commit 517b3f8

Please sign in to comment.