From c2989f2b1839055858e4b328473d0a8313094ff3 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 9 Oct 2015 00:50:07 +0200 Subject: [PATCH] [FLINK-2354] [runtime] Remove state changing futures in JobManager Internal actor states must only be modified within the actor thread. This avoids all the well-known issues coming with concurrency. Fix RemoveCachedJob by introducing RemoveJob Fix JobManagerITCase Add removeJob which maintains the job in the SubmittedJobGraphStore Make revokeLeadership not remove the jobs from the state backend Fix shading problem with curator by hiding CuratorFramework in ChaosMonkeyITCase --- flink-runtime/pom.xml | 4 +- .../flink/runtime/jobmanager/JobManager.scala | 262 ++++++++++-------- .../runtime/messages/JobManagerMessages.scala | 17 ++ ...nagerSubmittedJobGraphsRecoveryITCase.java | 5 +- .../zookeeper/ZooKeeperTestEnvironment.java | 10 + .../src/test/resources/log4j-test.properties | 2 +- .../runtime/jobmanager/JobManagerITCase.scala | 2 +- .../flink-shaded-curator-recipes/pom.xml | 78 ++++++ .../flink-shaded-curator-test/pom.xml | 86 ++++++ flink-shaded-curator/pom.xml | 82 ++---- flink-tests/pom.xml | 7 + .../test/recovery/ChaosMonkeyITCase.java | 15 +- .../yarn/YARNHighAvailabilityITCase.java | 13 +- .../flink/yarn/TestingYarnJobManager.scala | 10 +- 14 files changed, 404 insertions(+), 189 deletions(-) create mode 100644 flink-shaded-curator/flink-shaded-curator-recipes/pom.xml create mode 100644 flink-shaded-curator/flink-shaded-curator-test/pom.xml diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index f79c5edd8db4e..9db82b28aa3ec 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -182,7 +182,7 @@ under the License. org.apache.flink - flink-shaded-curator + flink-shaded-curator-recipes ${project.version} @@ -417,7 +417,7 @@ under the License. - org.apache.flink:flink-shaded-curator + org.apache.flink:flink-shaded-curator-recipes diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index f3e4054a33dbe..eef28d83ad2c6 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -64,7 +64,6 @@ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMess import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils} import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.forkjoin.ForkJoinPool @@ -127,6 +126,9 @@ class JobManager( var leaderSessionID: Option[UUID] = None + /** Futures which have to be completed before terminating the job manager */ + var futuresToComplete: Option[Seq[Future[Unit]]] = None + /** * Run when the job manager is started. Simply logs an informational message. * The method also starts the leader election service. @@ -163,7 +165,16 @@ class JobManager( override def postStop(): Unit = { log.info(s"Stopping JobManager ${getAddress}.") - cancelAndClearEverything(new Exception("The JobManager is shutting down.")) + val newFuturesToComplete = cancelAndClearEverything( + new Exception("The JobManager is shutting down."), + true) + + implicit val executionContext = context.dispatcher + + val futureToComplete = Future.sequence( + futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete) + + Await.ready(futureToComplete, timeout) // disconnect the registered task managers instanceManager.getAllRegisteredInstances.asScala.foreach { @@ -235,9 +246,11 @@ class JobManager( case RevokeLeadership => log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.") - future { - cancelAndClearEverything(new Exception("JobManager is no longer the leader.")) - }(context.dispatcher) + val newFuturesToComplete = cancelAndClearEverything( + new Exception("JobManager is no longer the leader."), + false) + + futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete) // disconnect the registered task managers instanceManager.getAllRegisteredInstances.asScala.foreach { @@ -315,9 +328,15 @@ class JobManager( val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(), jobGraph.getSessionTimeout) - future { - submitJob(jobGraph, jobInfo) - }(context.dispatcher) + submitJob(jobGraph, jobInfo) + + case RecoverSubmittedJob(submittedJobGraph) => + if (!currentJobs.contains(submittedJobGraph.getJobId)) { + submitJob( + submittedJobGraph.getJobGraph(), + submittedJobGraph.getJobInfo(), + isRecovery = true) + } case RecoverJob(jobId) => future { @@ -328,19 +347,18 @@ class JobManager( log.info(s"Attempting to recover job $jobId.") - val jobGraph = submittedJobGraphs.recoverJobGraph(jobId) + val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId) - if (jobGraph.isDefined) { - if (!leaderElectionService.hasLeadership()) { - // we've lost leadership. mission: abort. - log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.") - } - else { - recoverJobGraph(jobGraph.get) - } - } - else { - log.warn(s"Failed to recover job graph ${jobId}.") + submittedJobGraphOption match { + case Some(submittedJobGraph) => + if (!leaderElectionService.hasLeadership()) { + // we've lost leadership. mission: abort. + log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.") + } + else { + self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) + } + case None => log.warn(s"Failed to recover job graph $jobId.") } } }(context.dispatcher) @@ -362,7 +380,10 @@ class JobManager( else { log.debug(s"Attempting to recover ${jobGraphs.size} job graphs.") - jobGraphs.foreach(recoverJobGraph(_)) + jobGraphs.foreach{ + submittedJobGraph => + self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) + } } } }(context.dispatcher) @@ -473,7 +494,7 @@ class JobManager( if (newJobStatus.isTerminalState()) { jobInfo.end = timeStamp - future { + future{ // TODO If removing the JobGraph from the SubmittedJobGraphsStore fails, the job will // linger around and potentially be recovered at a later time. There is nothing we // can do about that, but it should be communicated with the Client. @@ -483,11 +504,11 @@ class JobManager( context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) { // remove only if no activity occurred in the meantime if (lastActivity == jobInfo.lastActive) { - removeJob(jobID) + self ! decorateMessage(RemoveJob(jobID, true)) } - } + }(context.dispatcher) } else { - removeJob(jobID) + self ! decorateMessage(RemoveJob(jobID, true)) } // is the client waiting for the job result? @@ -539,9 +560,7 @@ class JobManager( }(context.dispatcher) } case None => - future { - removeJob(jobID) - }(context.dispatcher) + self ! decorateMessage(RemoveJob(jobID, true)) } case ScheduleOrUpdateConsumers(jobId, partitionId) => @@ -646,9 +665,7 @@ class JobManager( case Heartbeat(instanceID, metricsReport, accumulators) => log.debug(s"Received hearbeat message from $instanceID.") - Future { - updateAccumulators(accumulators) - }(context.dispatcher) + updateAccumulators(accumulators) instanceManager.reportHeartBeat(instanceID, metricsReport) @@ -671,11 +688,26 @@ class JobManager( case RequestJobManagerStatus => sender() ! decorateMessage(JobManagerStatusAlive) + case RemoveJob(jobID, clearPersistedJob) => + currentJobs.get(jobID) match { + case Some((graph, info)) => + removeJob(graph.getJobID, clearPersistedJob) match { + case Some(futureToComplete) => + futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete) + case None => + } + case None => + } + case RemoveCachedJob(jobID) => currentJobs.get(jobID) match { case Some((graph, info)) => if (graph.getState.isTerminalState) { - removeJob(graph.getJobID) + removeJob(graph.getJobID, true) match { + case Some(futureToComplete) => + futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete) + case None => + } } else { // triggers removal upon completion of job info.sessionAlive = false @@ -761,6 +793,7 @@ class JobManager( jobGraph.getClasspaths, userCodeLoader) + currentJobs.put(jobGraph.getJobID, (graph, jobInfo)) graph } @@ -878,22 +911,6 @@ class JobManager( executionGraph.registerExecutionListener(gateway) executionGraph.registerJobStatusListener(gateway) } - - if (isRecovery) { - executionGraph.restoreLatestCheckpointedState() - } - else { - submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) - } - - // Add the job graph only after everything is finished. Otherwise there can be races in - // tests, which check the currentJobs (for example before killing a JM). - if (!currentJobs.contains(jobId)) { - currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo)) - } - - // done with submitting the job - jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) } catch { case t: Throwable => @@ -916,20 +933,39 @@ class JobManager( return } - if (leaderElectionService.hasLeadership) { - // There is a small chance that multiple job managers schedule the same job after if they - // try to recover at the same time. This will eventually be noticed, but can not be ruled - // out from the beginning. - - // NOTE: Scheduling the job for execution is a separate action from the job submission. - // The success of submitting the job must be independent from the success of scheduling - // the job. + // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously + // because it is a blocking operation + future { try { - log.info(s"Scheduling job $jobId ($jobName).") + if (isRecovery) { + executionGraph.restoreLatestCheckpointedState() + } + else { + submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) + } - executionGraph.scheduleForExecution(scheduler) - } - catch { + jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) + + if (leaderElectionService.hasLeadership) { + // There is a small chance that multiple job managers schedule the same job after if + // they try to recover at the same time. This will eventually be noticed, but can not be + // ruled out from the beginning. + + // NOTE: Scheduling the job for execution is a separate action from the job submission. + // The success of submitting the job must be independent from the success of scheduling + // the job. + log.info(s"Scheduling job $jobId ($jobName).") + + executionGraph.scheduleForExecution(scheduler) + } else { + // Remove the job graph. Otherwise it will be lingering around and possibly removed from + // ZooKeeper by this JM. + self ! decorateMessage(RemoveJob(jobId, false)) + + log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " + + "this. I am not scheduling the job for execution.") + } + } catch { case t: Throwable => try { executionGraph.fail(t) } @@ -939,27 +975,6 @@ class JobManager( } } } - } - else { - // Remove the job graph. Otherwise it will be lingering around and possibly removed from - // ZooKeeper by this JM. - currentJobs.remove(jobId) - - log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " + - "this. I am not scheduling the job for execution.") - } - } - } - - /** - * Submits the job if it is not already one of our current jobs. - * - * @param jobGraph Job to recover - */ - private def recoverJobGraph(jobGraph: SubmittedJobGraph): Unit = { - if (!currentJobs.contains(jobGraph.getJobId)) { - future { - submitJob(jobGraph.getJobGraph(), jobGraph.getJobInfo(), isRecovery = true) }(context.dispatcher) } } @@ -1169,20 +1184,24 @@ class JobManager( * might block. Therefore be careful not to block the actor thread. * * @param jobID ID of the job to remove and archive + * @param removeJobFromStateBackend true if the job shall be archived and removed from the state + * backend */ - private def removeJob(jobID: JobID): Unit = { - currentJobs.synchronized { - // Don't remove the job yet... - currentJobs.get(jobID) match { - case Some((eg, _)) => - try { - // ...otherwise, we can have lingering resources when there is a concurrent shutdown - // and the ZooKeeper client is closed. Not removing the job immediately allow the - // shutdown to release all resources. - submittedJobGraphs.removeJobGraph(jobID) - } catch { - case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t) - } + private def removeJob(jobID: JobID, removeJobFromStateBackend: Boolean): Option[Future[Unit]] = { + // Don't remove the job yet... + val futureOption = currentJobs.get(jobID) match { + case Some((eg, _)) => + val result = if (removeJobFromStateBackend) { + val futureOption = Some(future { + try { + // ...otherwise, we can have lingering resources when there is a concurrent shutdown + // and the ZooKeeper client is closed. Not removing the job immediately allow the + // shutdown to release all resources. + submittedJobGraphs.removeJobGraph(jobID) + } catch { + case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t) + } + }(context.dispatcher)) try { eg.prepareForArchiving() @@ -1193,9 +1212,15 @@ class JobManager( "archiving.", t) } - currentJobs.remove(jobID) - case None => - } + futureOption + } else { + None + } + + currentJobs.remove(jobID) + + result + case None => None } try { @@ -1204,6 +1229,8 @@ class JobManager( case t: Throwable => log.error(s"Could not properly unregister job $jobID form the library cache.", t) } + + futureOption } /** Fails all currently running jobs and empties the list of currently running jobs. If the @@ -1211,26 +1238,35 @@ class JobManager( * * @param cause Cause for the cancelling. */ - private def cancelAndClearEverything(cause: Throwable) { - for ((jobID, (eg, jobInfo)) <- currentJobs) { - try { - submittedJobGraphs.removeJobGraph(jobID) - } - catch { - case t: Throwable => { - log.error("Error during submitted job graph clean up.", t) + private def cancelAndClearEverything( + cause: Throwable, + removeJobFromStateBackend: Boolean) + : Seq[Future[Unit]] = { + val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield { + future { + if (removeJobFromStateBackend) { + try { + submittedJobGraphs.removeJobGraph(jobID) + } + catch { + case t: Throwable => { + log.error("Error during submitted job graph clean up.", t) + } + } } - } - eg.fail(cause) + eg.fail(cause) - if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { - jobInfo.client ! decorateMessage( - Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))) - } + if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) { + jobInfo.client ! decorateMessage( + Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))) + } + }(context.dispatcher) } currentJobs.clear() + + futures.toSeq } override def grantLeadership(newLeaderSessionID: UUID): Unit = { @@ -1285,7 +1321,9 @@ class JobManager( case accumulatorEvent => currentJobs.get(accumulatorEvent.getJobID) match { case Some((jobGraph, jobInfo)) => - jobGraph.updateAccumulators(accumulatorEvent) + future { + jobGraph.updateAccumulators(accumulatorEvent) + }(context.dispatcher) case None => // ignore accumulator values for old job } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index d77662242e198..8097bdce89ff4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGra import org.apache.flink.runtime.instance.{InstanceID, Instance} import org.apache.flink.runtime.io.network.partition.ResultPartitionID import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID} +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph import org.apache.flink.runtime.util.SerializedThrowable import scala.collection.JavaConverters._ @@ -72,6 +73,14 @@ object JobManagerMessages { */ case class RecoverJob(jobId: JobID) extends RequiresLeaderSessionID + /** + * Triggers the submission of the recovered job + * + * @param submittedJobGraph Contains the submitted JobGraph and the associated JobInfo + */ + case class RecoverSubmittedJob(submittedJobGraph: SubmittedJobGraph) + extends RequiresLeaderSessionID + /** * Triggers recovery of all available jobs. */ @@ -286,6 +295,14 @@ object JobManagerMessages { */ case class JobNotFound(jobID: JobID) extends JobResponse with JobStatusResponse + /** Triggers the removal of the job with the given job ID + * + * @param jobID + * @param removeJobFromStateBackend true if the job has properly finished + */ + case class RemoveJob(jobID: JobID, removeJobFromStateBackend: Boolean = true) + extends RequiresLeaderSessionID + /** * Removes the job belonging to the job identifier from the job manager and archives it. * @param jobID The job identifier diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java index ac250bd82a1e9..e6156e5312bd6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java @@ -194,7 +194,9 @@ public void testSubmitJobToNonLeader() throws Exception { JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, leadingJobManager, deadline.timeLeft()); - // Make sure that the **non-leading** JM has actually removed the job graph from her + log.info("Wait that the non-leader removes the submitted job."); + + // Make sure that the **non-leading** JM has actually removed the job graph from its // local state. boolean success = false; while (!success && deadline.hasTimeLeft()) { @@ -205,6 +207,7 @@ public void testSubmitJobToNonLeader() throws Exception { success = true; } else { + log.info(((JobManagerMessages.CurrentJobStatus)jobStatusResponse).status().toString()); Thread.sleep(100); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java index 7ae89d16f66b0..94e198805179e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java @@ -26,6 +26,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.util.ZooKeeperUtils; +import java.util.List; + /** * Simple ZooKeeper and CuratorFramework setup for tests. */ @@ -111,6 +113,14 @@ public CuratorFramework getClient() { return client; } + public String getClientNamespace() { + return client.getNamespace(); + } + + public List getChildren(String path) throws Exception { + return client.getChildren().forPath(path); + } + /** * Creates a new client for the started ZooKeeper server/cluster. */ diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties index 76b237e7e92a2..1ca02aaa9b9ef 100644 --- a/flink-runtime/src/test/resources/log4j-test.properties +++ b/flink-runtime/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF, console +log4j.rootLogger=INFO, console # ----------------------------------------------------------------------------- # Console (use 'console') diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 3a252f8cfe9ec..0f800c992969b 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -657,7 +657,7 @@ class JobManagerITCase(_system: ActorSystem) jm.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self) expectMsg(JobSubmitSuccess(jobGraph2.getJobID)) - // job stil running + // job still running jm.tell(RemoveCachedJob(jobGraph2.getJobID), self) expectMsgType[JobResultSuccess] diff --git a/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml new file mode 100644 index 0000000000000..c0a2adc5c8c9d --- /dev/null +++ b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml @@ -0,0 +1,78 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-shaded-curator + 0.10-SNAPSHOT + .. + + + flink-shaded-curator-recipes + flink-shaded-curator-recipes + + jar + + + + + org.apache.curator + curator-recipes + ${curator.version} + + + + + com.google.guava + guava + ${guava.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + log4j + org.slf4j:slf4j-log4j12 + + + + + + + + + diff --git a/flink-shaded-curator/flink-shaded-curator-test/pom.xml b/flink-shaded-curator/flink-shaded-curator-test/pom.xml new file mode 100644 index 0000000000000..2700c0c3fa6ba --- /dev/null +++ b/flink-shaded-curator/flink-shaded-curator-test/pom.xml @@ -0,0 +1,86 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-shaded-curator + 0.10-SNAPSHOT + .. + + + flink-shaded-curator-test + flink-shaded-curator-test + + jar + + + + + org.apache.curator + curator-test + ${curator.version} + + + + + com.google.guava + guava + ${guava.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + log4j + org.slf4j:slf4j-log4j12 + + + org.apache.curator:curator-test + + + + + org.apache.curator + org.apache.flink.shaded.org.apache.curator + + + + + + + + + diff --git a/flink-shaded-curator/pom.xml b/flink-shaded-curator/pom.xml index ac62cc8c97b14..29d646184007f 100644 --- a/flink-shaded-curator/pom.xml +++ b/flink-shaded-curator/pom.xml @@ -1,22 +1,21 @@ + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> @@ -29,50 +28,13 @@ under the License. .. + + flink-shaded-curator-recipes + flink-shaded-curator-test + + flink-shaded-curator flink-shaded-curator - jar - - - - - org.apache.curator - curator-recipes - ${curator.version} - - - - - com.google.guava - guava - ${guava.version} - - - - - - - org.apache.maven.plugins - maven-shade-plugin - - - shade-flink - package - - shade - - - - - log4j - org.slf4j:slf4j-log4j12 - - - - - - - - + pom diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 0dd20b1533a33..b9bae6ff918b4 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -147,6 +147,13 @@ under the License. test + + org.apache.flink + flink-shaded-curator-test + ${project.version} + test + + org.scalatest scalatest_${scala.binary.version} diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java index a0c831238c474..2cdf83c20e119 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java @@ -523,22 +523,19 @@ private void submitJobGraph( } private void checkCleanRecoveryState(Configuration config) throws Exception { - LOG.info("Checking " + ZooKeeper.getClient().getNamespace() + + LOG.info("Checking " + ZooKeeper.getClientNamespace() + ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); - List jobGraphs = ZooKeeper.getClient().getChildren() - .forPath(ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); + List jobGraphs = ZooKeeper.getChildren(ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); assertEquals("Unclean job graphs: " + jobGraphs, 0, jobGraphs.size()); - LOG.info("Checking " + ZooKeeper.getClient().getNamespace() + + LOG.info("Checking " + ZooKeeper.getClientNamespace() + ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH); - List checkpoints = ZooKeeper.getClient().getChildren() - .forPath(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH); + List checkpoints = ZooKeeper.getChildren(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH); assertEquals("Unclean checkpoints: " + checkpoints, 0, checkpoints.size()); - LOG.info("Checking " + ZooKeeper.getClient().getNamespace() + + LOG.info("Checking " + ZooKeeper.getClientNamespace() + ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH); - List checkpointCounter = ZooKeeper.getClient().getChildren() - .forPath(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH); + List checkpointCounter = ZooKeeper.getChildren(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH); assertEquals("Unclean checkpoint counter: " + checkpointCounter, 0, checkpointCounter.size()); LOG.info("ZooKeeper state is clean"); diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 94d0a81a75b38..a05621a985d5e 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -22,6 +22,7 @@ import akka.actor.PoisonPill; import akka.testkit.JavaTestKit; import org.apache.curator.test.TestingServer; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -37,7 +38,9 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import scala.concurrent.duration.FiniteDuration; import java.io.File; @@ -53,6 +56,9 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { private static final int numberApplicationAttempts = 10; + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + @BeforeClass public static void setup() { actorSystem = AkkaUtils.createDefaultActorSystem(); @@ -102,9 +108,14 @@ public void testMultipleAMKill() throws Exception { String confDirPath = System.getenv("FLINK_CONF_DIR"); flinkYarnClient.setConfigurationDirectory(confDirPath); + String fsStateHandlePath = tmp.getRoot().getPath(); + flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration()); flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" + - zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts); + zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + + "@@" + ConfigConstants.STATE_BACKEND_FS_DIR + "=" + fsStateHandlePath + "/checkpoints" + + "@@" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery"); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); AbstractFlinkYarnCluster yarnCluster = null; diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index 83d1f3c2fa54d..fa7003953ac19 100644 --- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -21,8 +21,10 @@ package org.apache.flink.yarn import akka.actor.ActorRef import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.StreamingMode +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.testingUtils.TestingJobManagerLike @@ -60,7 +62,9 @@ class TestingYarnJobManager( delayBetweenRetries: Long, timeout: FiniteDuration, mode: StreamingMode, - leaderElectionService: LeaderElectionService) + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory) extends YarnJobManager( flinkConfiguration, executionContext, @@ -72,7 +76,9 @@ class TestingYarnJobManager( delayBetweenRetries, timeout, mode, - leaderElectionService) + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory) with TestingJobManagerLike { override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner]