Skip to content

Commit

Permalink
[FLINK-3185] [runtime, tests] Log error on failure during recovery
Browse files Browse the repository at this point in the history
This closes apache#1472
  • Loading branch information
uce authored and StephanEwen committed Dec 26, 2015
1 parent 4fca1db commit 7cb25cb
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,14 @@ class JobManager(
leaderSessionID = newLeaderSessionID

// confirming the leader session ID might be blocking, thus do it in a future
future{
future {
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID.orNull)

// TODO (critical next step) This needs to be more flexible and robust (e.g. wait for task
// managers etc.)
if (recoveryMode != RecoveryMode.STANDALONE) {
log.info(s"Delaying recovery of all jobs for $delayBetweenRetries ms.")

context.system.scheduler.scheduleOnce(new FiniteDuration(delayBetweenRetries,
MILLISECONDS), self, decorateMessage(RecoverAllJobs))(context.dispatcher)
}
Expand Down Expand Up @@ -344,6 +346,10 @@ class JobManager(
submittedJobGraph.getJobInfo(),
isRecovery = true)
}
else {
log.info(s"Ignoring job recovery for ${submittedJobGraph.getJobId}, " +
s"because it is already submitted.")
}

case RecoverJob(jobId) =>
future {
Expand Down Expand Up @@ -372,28 +378,32 @@ class JobManager(

case RecoverAllJobs =>
future {
// The ActorRef, which is part of the submitted job graph can only be de-serialized in the
// scope of an actor system.
akka.serialization.JavaSerializer.currentSystem.withValue(
context.system.asInstanceOf[ExtendedActorSystem]) {
try {
// The ActorRef, which is part of the submitted job graph can only be
// de-serialized in the scope of an actor system.
akka.serialization.JavaSerializer.currentSystem.withValue(
context.system.asInstanceOf[ExtendedActorSystem]) {

log.info(s"Recovering all jobs.")
log.info(s"Attempting to recover all jobs.")

val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala
val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala

if (!leaderElectionService.hasLeadership()) {
// we've lost leadership. mission: abort.
log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " +
s"jobs.")
}
else {
log.debug(s"Attempting to recover ${jobGraphs.size} job graphs.")
if (!leaderElectionService.hasLeadership()) {
// we've lost leadership. mission: abort.
log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " +
s"jobs.")
}
else {
log.info(s"Re-submitting ${jobGraphs.size} job graphs.")

jobGraphs.foreach{
submittedJobGraph =>
self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
jobGraphs.foreach{
submittedJobGraph =>
self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
}
}
}
} catch {
case e: Exception => log.error("Fatal error: Failed to recover jobs.", e)
}
}(context.dispatcher)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,15 @@ public void destroy() {
}
}

public String getProcessOutput() {
if (processOutput != null) {
return processOutput.toString();
}
else {
return null;
}
}

// ---------------------------------------------------------------------------------------------
// File based synchronization utilities
// ---------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.taskmanager.TaskManager;
Expand Down Expand Up @@ -67,6 +69,7 @@
import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

public class JobManagerCheckpointRecoveryITCase extends TestLogger {

Expand Down Expand Up @@ -96,7 +99,7 @@ public static void tearDown() throws Exception {

@Before
public void cleanUp() throws Exception {
if (FileStateBackendBasePath != null) {
if (FileStateBackendBasePath != null && FileStateBackendBasePath.exists()) {
FileUtils.cleanDirectory(FileStateBackendBasePath);
}

Expand Down Expand Up @@ -270,6 +273,155 @@ public void testCheckpointedStreamingSumProgram() throws Exception {
}
}

/**
* Tests that the JobManager logs failures during recovery properly.
*
* @see <a href="https://issues.apache.org/jira/browse/FLINK-3185">FLINK-3185</a>
*/
@Test
public void testCheckpointRecoveryFailure() throws Exception {
final Deadline testDeadline = TestTimeOut.fromNow();
final String zooKeeperQuorum = ZooKeeper.getConnectString();
final String fileStateBackendPath = FileStateBackendBasePath.getAbsoluteFile().toString();

Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
zooKeeperQuorum,
fileStateBackendPath);

config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);

JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
LeaderRetrievalService leaderRetrievalService = null;
ActorSystem taskManagerSystem = null;
ActorSystem testActorSystem = null;

try {
// Test actor system
testActorSystem = AkkaUtils.createActorSystem(new Configuration(),
new Some<>(new Tuple2<String, Object>("localhost", 0)));

// The job managers
jobManagerProcess[0] = new JobManagerProcess(0, config);
jobManagerProcess[1] = new JobManagerProcess(1, config);

jobManagerProcess[0].createAndStart();
jobManagerProcess[1].createAndStart();

// Leader listener
TestingListener leaderListener = new TestingListener();
leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config);
leaderRetrievalService.start(leaderListener);

// The task manager
taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
TaskManager.startTaskManagerComponentsAndActor(
config, taskManagerSystem, "localhost",
Option.<String>empty(), Option.<LeaderRetrievalService>empty(),
false, TaskManager.class);

// Get the leader
leaderListener.waitForNewLeader(testDeadline.timeLeft().toMillis());

String leaderAddress = leaderListener.getAddress();
UUID leaderId = leaderListener.getLeaderSessionID();

// Get the leader ref
ActorRef leaderRef = AkkaUtils.getActorRef(
leaderAddress, testActorSystem, testDeadline.timeLeft());
ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);

// Who's the boss?
JobManagerProcess leadingJobManagerProcess;
JobManagerProcess nonLeadingJobManagerProcess;
if (jobManagerProcess[0].getJobManagerAkkaURL().equals(leaderListener.getAddress())) {
leadingJobManagerProcess = jobManagerProcess[0];
nonLeadingJobManagerProcess = jobManagerProcess[1];
}
else {
leadingJobManagerProcess = jobManagerProcess[1];
nonLeadingJobManagerProcess = jobManagerProcess[0];
}

// BLocking JobGraph
JobVertex blockingVertex = new JobVertex("Blocking vertex");
blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
JobGraph jobGraph = new JobGraph(blockingVertex);

// Submit the job in detached mode
leader.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED));

// Wait for the job to be running
JobManagerActorTestUtils.waitForJobStatus(
jobGraph.getJobID(),
JobStatus.RUNNING,
leader,
testDeadline.timeLeft());

// Remove all files
FileUtils.deleteDirectory(FileStateBackendBasePath);

// Kill the leader
leadingJobManagerProcess.destroy();

// Verify that the job manager logs the failed recovery. We can not
// do more at this point. :(
boolean success = false;

while (testDeadline.hasTimeLeft()) {
String output = nonLeadingJobManagerProcess.getProcessOutput();

if (output != null) {
if (output.contains("Fatal error: Failed to recover jobs") &&
output.contains("java.io.FileNotFoundException")) {

success = true;
break;
}
}
else {
log.warn("No process output available.");
}

Thread.sleep(500);
}

assertTrue("Did not find expected output in logs.", success);
}
catch (Throwable t) {
// In case of an error, print the job manager process logs.
if (jobManagerProcess[0] != null) {
jobManagerProcess[0].printProcessLog();
}

if (jobManagerProcess[1] != null) {
jobManagerProcess[1].printProcessLog();
}

throw t;
}
finally {
if (jobManagerProcess[0] != null) {
jobManagerProcess[0].destroy();
}

if (jobManagerProcess[1] != null) {
jobManagerProcess[1].destroy();
}

if (leaderRetrievalService != null) {
leaderRetrievalService.stop();
}

if (taskManagerSystem != null) {
taskManagerSystem.shutdown();
}

if (testActorSystem != null) {
testActorSystem.shutdown();
}
}
}

// ---------------------------------------------------------------------------------------------

/**
Expand Down

0 comments on commit 7cb25cb

Please sign in to comment.