Skip to content

Commit

Permalink
[FLINK-2008] [FLINK-2296] Fix checkpoint committing & KafkaITCase
Browse files Browse the repository at this point in the history
This closes apache#895
  • Loading branch information
rmetzger authored and StephanEwen committed Jul 13, 2015
1 parent 8d1efa0 commit aa5e5b3
Show file tree
Hide file tree
Showing 23 changed files with 399 additions and 219 deletions.
3 changes: 3 additions & 0 deletions docs/apis/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,9 @@ Another way of exposing user defined operator state for the Flink runtime for ch

When the user defined function implements the `Checkpointed` interface, the `snapshotState(…)` and `restoreState(…)` methods will be executed to draw and restore function state.

In addition to that, user functions can also implement the `CheckpointNotifier` interface to receive notifications on completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` method.
Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. The notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications.

For example the same counting, reduce function shown for `OperatorState`s by using the `Checkpointed` interface instead:

{% highlight java %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -375,11 +373,8 @@ else if (checkpoint != null) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ExecutionAttemptID attemptId = ee.getAttemptId();
StateForTask stateForTask = completed.getState(ev.getJobvertexId());
SerializedValue<StateHandle<?>> taskState = (stateForTask != null) ? stateForTask.getState() : null;
ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId,
timestamp, taskState);
ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId());
NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@

package org.apache.flink.runtime.checkpoint;

import com.google.common.collect.Maps;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

/**
* A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
Expand All @@ -41,20 +38,14 @@ public class SuccessfulCheckpoint {

private final long timestamp;

private final Map<JobVertexID, StateForTask> vertexToState;

private final List<StateForTask> states;
private final List<StateForTask> states;


public SuccessfulCheckpoint(JobID job, long checkpointID, long timestamp, List<StateForTask> states) {
this.job = job;
this.checkpointID = checkpointID;
this.timestamp = timestamp;
this.states = states;
vertexToState = Maps.newHashMap();
for(StateForTask state : states){
vertexToState.put(state.getOperatorId(), state);
}
}

public JobID getJobId() {
Expand All @@ -73,17 +64,6 @@ public List<StateForTask> getStates() {
return states;
}

/**
* Returns the task state included in the checkpoint for a given JobVertexID if it exists or
* null if no state is included for that id.
*
* @param jobVertexID
* @return
*/
public StateForTask getState(JobVertexID jobVertexID) {
return vertexToState.get(jobVertexID);
}

// --------------------------------------------------------------------------------------------

public void discard(ClassLoader userClassLoader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ public String getTaskNameWithSubtaskIndex() {
getTotalNumberOfParallelSubtasks());
}

public int getSubTaskIndex() {
return subTaskIndex;
}

public int getTotalNumberOfParallelSubtasks() {
return this.jobVertex.getParallelism();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

package org.apache.flink.runtime.jobgraph.tasks;

import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.SerializedValue;

public interface CheckpointCommittingOperator {
public interface CheckpointNotificationOperator {

void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> state) throws Exception;
void notifyCheckpointComplete(long checkpointId) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,22 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.SerializedValue;

/**
* This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the
* {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task that the checkpoint
* has been confirmed and that the task can commit the checkpoint to the outside world.
*/
public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
public class NotifyCheckpointComplete extends AbstractCheckpointMessage implements java.io.Serializable {

private static final long serialVersionUID = 2094094662279578953L;

/** The timestamp associated with the checkpoint */
private final long timestamp;

/** The stateHandle associated with the checkpoint confirmation message*/
private final SerializedValue<StateHandle<?>> state;

public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp,
SerializedValue<StateHandle<?>> state) {
public NotifyCheckpointComplete(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
super(job, taskExecutionId, checkpointId);
this.timestamp = timestamp;
this.state = state;
}

// --------------------------------------------------------------------------------------------
Expand All @@ -53,14 +46,6 @@ public long getTimestamp() {

// --------------------------------------------------------------------------------------------

/**
* Returns the stateHandle that was included in the confirmed checkpoint for a given task or null
* if no state was commited in that checkpoint.
*/
public SerializedValue<StateHandle<?>> getState() {
return state;
}

@Override
public int hashCode() {
return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
Expand All @@ -71,8 +56,8 @@ public boolean equals(Object o) {
if (this == o) {
return true;
}
else if (o instanceof ConfirmCheckpoint) {
ConfirmCheckpoint that = (ConfirmCheckpoint) o;
else if (o instanceof NotifyCheckpointComplete) {
NotifyCheckpointComplete that = (NotifyCheckpointComplete) o;
return this.timestamp == that.timestamp && super.equals(o);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.memorymanager.MemoryManager;
Expand Down Expand Up @@ -883,11 +883,11 @@ public void run() {
checkpointer.triggerCheckpoint(checkpointID, checkpointTimestamp);
}
catch (Throwable t) {
logger.error("Error while triggering checkpoint for " + taskName, t);
failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t));
}
}
};
executeAsyncCallRunnable(runnable, "Checkpoint Trigger");
executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName);
}
else {
LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
Expand All @@ -899,30 +899,30 @@ public void run() {
}
}

public void confirmCheckpoint(final long checkpointID,
final SerializedValue<StateHandle<?>> state) {
public void notifyCheckpointComplete(final long checkpointID) {
AbstractInvokable invokable = this.invokable;

if (executionState == ExecutionState.RUNNING && invokable != null) {
if (invokable instanceof CheckpointCommittingOperator) {
if (invokable instanceof CheckpointNotificationOperator) {

// build a local closure
final CheckpointCommittingOperator checkpointer = (CheckpointCommittingOperator) invokable;
final CheckpointNotificationOperator checkpointer = (CheckpointNotificationOperator) invokable;
final Logger logger = LOG;
final String taskName = taskNameWithSubtask;

Runnable runnable = new Runnable() {
@Override
public void run() {
try {
checkpointer.confirmCheckpoint(checkpointID, state);
checkpointer.notifyCheckpointComplete(checkpointID);
}
catch (Throwable t) {
logger.error("Error while confirming checkpoint for " + taskName, t);
// fail task if checkpoint confirmation failed.
failExternally(new RuntimeException("Error while confirming checkpoint", t));
}
}
};
executeAsyncCallRunnable(runnable, "Checkpoint Confirmation");
executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName);
}
else {
LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import grizzled.slf4j.Logger

import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException}
import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage}
import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.{BlobService, BlobCache}
Expand Down Expand Up @@ -425,17 +425,16 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
log.debug(s"Taskmanager received a checkpoint request for unknown task $taskExecutionId.")
}

case message: ConfirmCheckpoint =>
case message: NotifyCheckpointComplete =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
val state = message.getState

log.debug(s"Receiver ConfirmCheckpoint ${checkpointId}@${timestamp} for $taskExecutionId.")

val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.confirmCheckpoint(checkpointId, state)
task.notifyCheckpointComplete(checkpointId)
} else {
log.debug(
s"Taskmanager received a checkpoint confirmation for unknown task $taskExecutionId.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.junit.Test;

Expand Down Expand Up @@ -199,8 +199,8 @@ public void testTriggerAndConfirmSimpleCheckpoint() {

// validate that the relevant tasks got a confirmation message
{
ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointId, timestamp, null);
ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointId, timestamp, null);
NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointId, timestamp);
NotifyCheckpointComplete confirmMessage2 = new NotifyCheckpointComplete(jid, attemptID2, checkpointId, timestamp);
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
}
Expand Down Expand Up @@ -237,8 +237,8 @@ public void testTriggerAndConfirmSimpleCheckpoint() {
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));

ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew, null);
ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew, null);
NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointIdNew, timestampNew);
NotifyCheckpointComplete confirmMessage2 = new NotifyCheckpointComplete(jid, attemptID2, checkpointIdNew, timestampNew);
verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
}
Expand Down Expand Up @@ -343,7 +343,7 @@ public void testMultipleConcurrentCheckpoints() {

// the first confirm message should be out
verify(commitVertex, times(1)).sendMessageToCurrentExecution(
new ConfirmCheckpoint(jid, commitAttemptID, checkpointId1, timestamp1, null), commitAttemptID);
new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID);

// send the last remaining ack for the second checkpoint
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
Expand All @@ -355,7 +355,7 @@ public void testMultipleConcurrentCheckpoints() {

// the second commit message should be out
verify(commitVertex, times(1)).sendMessageToCurrentExecution(
new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2, null), commitAttemptID);
new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);

// validate the committed checkpoints
List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
Expand Down Expand Up @@ -482,7 +482,7 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() {

// the first confirm message should be out
verify(commitVertex, times(1)).sendMessageToCurrentExecution(
new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2, null), commitAttemptID);
new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);

// send the last remaining ack for the first checkpoint. This should not do anything
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
Expand Down Expand Up @@ -551,7 +551,7 @@ public void testCheckpointTimeoutIsolated() {

// no confirm message must have been sent
verify(commitVertex, times(0))
.sendMessageToCurrentExecution(any(ConfirmCheckpoint.class), any(ExecutionAttemptID.class));
.sendMessageToCurrentExecution(any(NotifyCheckpointComplete.class), any(ExecutionAttemptID.class));

coord.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.runtime.state.StateHandle;
Expand All @@ -38,7 +38,7 @@ public class CheckpointMessagesTest {
@Test
public void testTriggerAndConfirmCheckpoint() {
try {
ConfirmCheckpoint cc = new ConfirmCheckpoint(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L, null);
NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
testSerializabilityEqualsHashCode(cc);

TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,10 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.memorymanager.MemoryManager;

import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.SerializedValue;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -134,7 +132,7 @@ public void testMixedAsyncCallsInOrder() {

for (int i = 1; i <= NUM_CALLS; i++) {
task.triggerCheckpointBarrier(i, 156865867234L);
task.confirmCheckpoint(i, null);
task.notifyCheckpointComplete(i);
}

triggerLatch.await();
Expand Down Expand Up @@ -186,7 +184,7 @@ private static Task createTask() {
}

public static class CheckpointsInOrderInvokable extends AbstractInvokable
implements CheckpointedOperator, CheckpointCommittingOperator {
implements CheckpointedOperator, CheckpointNotificationOperator {

private volatile long lastCheckpointId = 0;

Expand All @@ -213,7 +211,7 @@ public void invoke() throws Exception {
}

@Override
public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
public void triggerCheckpoint(long checkpointId, long timestamp) {
lastCheckpointId++;
if (checkpointId == lastCheckpointId) {
if (lastCheckpointId == NUM_CALLS) {
Expand All @@ -229,7 +227,7 @@ else if (this.error == null) {
}

@Override
public void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> state) throws Exception {
public void notifyCheckpointComplete(long checkpointId) {
if (checkpointId != lastCheckpointId && this.error == null) {
this.error = new Exception("calls out of order");
synchronized (this) {
Expand Down
Loading

0 comments on commit aa5e5b3

Please sign in to comment.