diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index f79c5edd8db4e..9db82b28aa3ec 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -182,7 +182,7 @@ under the License.
org.apache.flink
- flink-shaded-curator
+ flink-shaded-curator-recipes
${project.version}
@@ -417,7 +417,7 @@ under the License.
- org.apache.flink:flink-shaded-curator
+ org.apache.flink:flink-shaded-curator-recipes
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f3e4054a33dbe..eef28d83ad2c6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -64,7 +64,6 @@ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMess
import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils}
import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
@@ -127,6 +126,9 @@ class JobManager(
var leaderSessionID: Option[UUID] = None
+ /** Futures which have to be completed before terminating the job manager */
+ var futuresToComplete: Option[Seq[Future[Unit]]] = None
+
/**
* Run when the job manager is started. Simply logs an informational message.
* The method also starts the leader election service.
@@ -163,7 +165,16 @@ class JobManager(
override def postStop(): Unit = {
log.info(s"Stopping JobManager ${getAddress}.")
- cancelAndClearEverything(new Exception("The JobManager is shutting down."))
+ val newFuturesToComplete = cancelAndClearEverything(
+ new Exception("The JobManager is shutting down."),
+ true)
+
+ implicit val executionContext = context.dispatcher
+
+ val futureToComplete = Future.sequence(
+ futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)
+
+ Await.ready(futureToComplete, timeout)
// disconnect the registered task managers
instanceManager.getAllRegisteredInstances.asScala.foreach {
@@ -235,9 +246,11 @@ class JobManager(
case RevokeLeadership =>
log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
- future {
- cancelAndClearEverything(new Exception("JobManager is no longer the leader."))
- }(context.dispatcher)
+ val newFuturesToComplete = cancelAndClearEverything(
+ new Exception("JobManager is no longer the leader."),
+ false)
+
+ futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)
// disconnect the registered task managers
instanceManager.getAllRegisteredInstances.asScala.foreach {
@@ -315,9 +328,15 @@ class JobManager(
val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
jobGraph.getSessionTimeout)
- future {
- submitJob(jobGraph, jobInfo)
- }(context.dispatcher)
+ submitJob(jobGraph, jobInfo)
+
+ case RecoverSubmittedJob(submittedJobGraph) =>
+ if (!currentJobs.contains(submittedJobGraph.getJobId)) {
+ submitJob(
+ submittedJobGraph.getJobGraph(),
+ submittedJobGraph.getJobInfo(),
+ isRecovery = true)
+ }
case RecoverJob(jobId) =>
future {
@@ -328,19 +347,18 @@ class JobManager(
log.info(s"Attempting to recover job $jobId.")
- val jobGraph = submittedJobGraphs.recoverJobGraph(jobId)
+ val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId)
- if (jobGraph.isDefined) {
- if (!leaderElectionService.hasLeadership()) {
- // we've lost leadership. mission: abort.
- log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
- }
- else {
- recoverJobGraph(jobGraph.get)
- }
- }
- else {
- log.warn(s"Failed to recover job graph ${jobId}.")
+ submittedJobGraphOption match {
+ case Some(submittedJobGraph) =>
+ if (!leaderElectionService.hasLeadership()) {
+ // we've lost leadership. mission: abort.
+ log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.")
+ }
+ else {
+ self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
+ }
+ case None => log.warn(s"Failed to recover job graph $jobId.")
}
}
}(context.dispatcher)
@@ -362,7 +380,10 @@ class JobManager(
else {
log.debug(s"Attempting to recover ${jobGraphs.size} job graphs.")
- jobGraphs.foreach(recoverJobGraph(_))
+ jobGraphs.foreach{
+ submittedJobGraph =>
+ self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
+ }
}
}
}(context.dispatcher)
@@ -473,7 +494,7 @@ class JobManager(
if (newJobStatus.isTerminalState()) {
jobInfo.end = timeStamp
- future {
+ future{
// TODO If removing the JobGraph from the SubmittedJobGraphsStore fails, the job will
// linger around and potentially be recovered at a later time. There is nothing we
// can do about that, but it should be communicated with the Client.
@@ -483,11 +504,11 @@ class JobManager(
context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
// remove only if no activity occurred in the meantime
if (lastActivity == jobInfo.lastActive) {
- removeJob(jobID)
+ self ! decorateMessage(RemoveJob(jobID, true))
}
- }
+ }(context.dispatcher)
} else {
- removeJob(jobID)
+ self ! decorateMessage(RemoveJob(jobID, true))
}
// is the client waiting for the job result?
@@ -539,9 +560,7 @@ class JobManager(
}(context.dispatcher)
}
case None =>
- future {
- removeJob(jobID)
- }(context.dispatcher)
+ self ! decorateMessage(RemoveJob(jobID, true))
}
case ScheduleOrUpdateConsumers(jobId, partitionId) =>
@@ -646,9 +665,7 @@ class JobManager(
case Heartbeat(instanceID, metricsReport, accumulators) =>
log.debug(s"Received hearbeat message from $instanceID.")
- Future {
- updateAccumulators(accumulators)
- }(context.dispatcher)
+ updateAccumulators(accumulators)
instanceManager.reportHeartBeat(instanceID, metricsReport)
@@ -671,11 +688,26 @@ class JobManager(
case RequestJobManagerStatus =>
sender() ! decorateMessage(JobManagerStatusAlive)
+ case RemoveJob(jobID, clearPersistedJob) =>
+ currentJobs.get(jobID) match {
+ case Some((graph, info)) =>
+ removeJob(graph.getJobID, clearPersistedJob) match {
+ case Some(futureToComplete) =>
+ futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete)
+ case None =>
+ }
+ case None =>
+ }
+
case RemoveCachedJob(jobID) =>
currentJobs.get(jobID) match {
case Some((graph, info)) =>
if (graph.getState.isTerminalState) {
- removeJob(graph.getJobID)
+ removeJob(graph.getJobID, true) match {
+ case Some(futureToComplete) =>
+ futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete)
+ case None =>
+ }
} else {
// triggers removal upon completion of job
info.sessionAlive = false
@@ -761,6 +793,7 @@ class JobManager(
jobGraph.getClasspaths,
userCodeLoader)
+ currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
graph
}
@@ -878,22 +911,6 @@ class JobManager(
executionGraph.registerExecutionListener(gateway)
executionGraph.registerJobStatusListener(gateway)
}
-
- if (isRecovery) {
- executionGraph.restoreLatestCheckpointedState()
- }
- else {
- submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))
- }
-
- // Add the job graph only after everything is finished. Otherwise there can be races in
- // tests, which check the currentJobs (for example before killing a JM).
- if (!currentJobs.contains(jobId)) {
- currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
- }
-
- // done with submitting the job
- jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
}
catch {
case t: Throwable =>
@@ -916,20 +933,39 @@ class JobManager(
return
}
- if (leaderElectionService.hasLeadership) {
- // There is a small chance that multiple job managers schedule the same job after if they
- // try to recover at the same time. This will eventually be noticed, but can not be ruled
- // out from the beginning.
-
- // NOTE: Scheduling the job for execution is a separate action from the job submission.
- // The success of submitting the job must be independent from the success of scheduling
- // the job.
+ // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
+ // because it is a blocking operation
+ future {
try {
- log.info(s"Scheduling job $jobId ($jobName).")
+ if (isRecovery) {
+ executionGraph.restoreLatestCheckpointedState()
+ }
+ else {
+ submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))
+ }
- executionGraph.scheduleForExecution(scheduler)
- }
- catch {
+ jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
+
+ if (leaderElectionService.hasLeadership) {
+ // There is a small chance that multiple job managers schedule the same job after if
+ // they try to recover at the same time. This will eventually be noticed, but can not be
+ // ruled out from the beginning.
+
+ // NOTE: Scheduling the job for execution is a separate action from the job submission.
+ // The success of submitting the job must be independent from the success of scheduling
+ // the job.
+ log.info(s"Scheduling job $jobId ($jobName).")
+
+ executionGraph.scheduleForExecution(scheduler)
+ } else {
+ // Remove the job graph. Otherwise it will be lingering around and possibly removed from
+ // ZooKeeper by this JM.
+ self ! decorateMessage(RemoveJob(jobId, false))
+
+ log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
+ "this. I am not scheduling the job for execution.")
+ }
+ } catch {
case t: Throwable => try {
executionGraph.fail(t)
}
@@ -939,27 +975,6 @@ class JobManager(
}
}
}
- }
- else {
- // Remove the job graph. Otherwise it will be lingering around and possibly removed from
- // ZooKeeper by this JM.
- currentJobs.remove(jobId)
-
- log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
- "this. I am not scheduling the job for execution.")
- }
- }
- }
-
- /**
- * Submits the job if it is not already one of our current jobs.
- *
- * @param jobGraph Job to recover
- */
- private def recoverJobGraph(jobGraph: SubmittedJobGraph): Unit = {
- if (!currentJobs.contains(jobGraph.getJobId)) {
- future {
- submitJob(jobGraph.getJobGraph(), jobGraph.getJobInfo(), isRecovery = true)
}(context.dispatcher)
}
}
@@ -1169,20 +1184,24 @@ class JobManager(
* might block. Therefore be careful not to block the actor thread.
*
* @param jobID ID of the job to remove and archive
+ * @param removeJobFromStateBackend true if the job shall be archived and removed from the state
+ * backend
*/
- private def removeJob(jobID: JobID): Unit = {
- currentJobs.synchronized {
- // Don't remove the job yet...
- currentJobs.get(jobID) match {
- case Some((eg, _)) =>
- try {
- // ...otherwise, we can have lingering resources when there is a concurrent shutdown
- // and the ZooKeeper client is closed. Not removing the job immediately allow the
- // shutdown to release all resources.
- submittedJobGraphs.removeJobGraph(jobID)
- } catch {
- case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t)
- }
+ private def removeJob(jobID: JobID, removeJobFromStateBackend: Boolean): Option[Future[Unit]] = {
+ // Don't remove the job yet...
+ val futureOption = currentJobs.get(jobID) match {
+ case Some((eg, _)) =>
+ val result = if (removeJobFromStateBackend) {
+ val futureOption = Some(future {
+ try {
+ // ...otherwise, we can have lingering resources when there is a concurrent shutdown
+ // and the ZooKeeper client is closed. Not removing the job immediately allow the
+ // shutdown to release all resources.
+ submittedJobGraphs.removeJobGraph(jobID)
+ } catch {
+ case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t)
+ }
+ }(context.dispatcher))
try {
eg.prepareForArchiving()
@@ -1193,9 +1212,15 @@ class JobManager(
"archiving.", t)
}
- currentJobs.remove(jobID)
- case None =>
- }
+ futureOption
+ } else {
+ None
+ }
+
+ currentJobs.remove(jobID)
+
+ result
+ case None => None
}
try {
@@ -1204,6 +1229,8 @@ class JobManager(
case t: Throwable =>
log.error(s"Could not properly unregister job $jobID form the library cache.", t)
}
+
+ futureOption
}
/** Fails all currently running jobs and empties the list of currently running jobs. If the
@@ -1211,26 +1238,35 @@ class JobManager(
*
* @param cause Cause for the cancelling.
*/
- private def cancelAndClearEverything(cause: Throwable) {
- for ((jobID, (eg, jobInfo)) <- currentJobs) {
- try {
- submittedJobGraphs.removeJobGraph(jobID)
- }
- catch {
- case t: Throwable => {
- log.error("Error during submitted job graph clean up.", t)
+ private def cancelAndClearEverything(
+ cause: Throwable,
+ removeJobFromStateBackend: Boolean)
+ : Seq[Future[Unit]] = {
+ val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
+ future {
+ if (removeJobFromStateBackend) {
+ try {
+ submittedJobGraphs.removeJobGraph(jobID)
+ }
+ catch {
+ case t: Throwable => {
+ log.error("Error during submitted job graph clean up.", t)
+ }
+ }
}
- }
- eg.fail(cause)
+ eg.fail(cause)
- if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
- jobInfo.client ! decorateMessage(
- Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))
- }
+ if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
+ jobInfo.client ! decorateMessage(
+ Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))
+ }
+ }(context.dispatcher)
}
currentJobs.clear()
+
+ futures.toSeq
}
override def grantLeadership(newLeaderSessionID: UUID): Unit = {
@@ -1285,7 +1321,9 @@ class JobManager(
case accumulatorEvent =>
currentJobs.get(accumulatorEvent.getJobID) match {
case Some((jobGraph, jobInfo)) =>
- jobGraph.updateAccumulators(accumulatorEvent)
+ future {
+ jobGraph.updateAccumulators(accumulatorEvent)
+ }(context.dispatcher)
case None =>
// ignore accumulator values for old job
}
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index d77662242e198..8097bdce89ff4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGra
import org.apache.flink.runtime.instance.{InstanceID, Instance}
import org.apache.flink.runtime.io.network.partition.ResultPartitionID
import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph
import org.apache.flink.runtime.util.SerializedThrowable
import scala.collection.JavaConverters._
@@ -72,6 +73,14 @@ object JobManagerMessages {
*/
case class RecoverJob(jobId: JobID) extends RequiresLeaderSessionID
+ /**
+ * Triggers the submission of the recovered job
+ *
+ * @param submittedJobGraph Contains the submitted JobGraph and the associated JobInfo
+ */
+ case class RecoverSubmittedJob(submittedJobGraph: SubmittedJobGraph)
+ extends RequiresLeaderSessionID
+
/**
* Triggers recovery of all available jobs.
*/
@@ -286,6 +295,14 @@ object JobManagerMessages {
*/
case class JobNotFound(jobID: JobID) extends JobResponse with JobStatusResponse
+ /** Triggers the removal of the job with the given job ID
+ *
+ * @param jobID
+ * @param removeJobFromStateBackend true if the job has properly finished
+ */
+ case class RemoveJob(jobID: JobID, removeJobFromStateBackend: Boolean = true)
+ extends RequiresLeaderSessionID
+
/**
* Removes the job belonging to the job identifier from the job manager and archives it.
* @param jobID The job identifier
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
index ac250bd82a1e9..e6156e5312bd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
@@ -194,7 +194,9 @@ public void testSubmitJobToNonLeader() throws Exception {
JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING,
leadingJobManager, deadline.timeLeft());
- // Make sure that the **non-leading** JM has actually removed the job graph from her
+ log.info("Wait that the non-leader removes the submitted job.");
+
+ // Make sure that the **non-leading** JM has actually removed the job graph from its
// local state.
boolean success = false;
while (!success && deadline.hasTimeLeft()) {
@@ -205,6 +207,7 @@ public void testSubmitJobToNonLeader() throws Exception {
success = true;
}
else {
+ log.info(((JobManagerMessages.CurrentJobStatus)jobStatusResponse).status().toString());
Thread.sleep(100);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
index 7ae89d16f66b0..94e198805179e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -26,6 +26,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ZooKeeperUtils;
+import java.util.List;
+
/**
* Simple ZooKeeper and CuratorFramework setup for tests.
*/
@@ -111,6 +113,14 @@ public CuratorFramework getClient() {
return client;
}
+ public String getClientNamespace() {
+ return client.getNamespace();
+ }
+
+ public List getChildren(String path) throws Exception {
+ return client.getChildren().forPath(path);
+ }
+
/**
* Creates a new client for the started ZooKeeper server/cluster.
*/
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 76b237e7e92a2..1ca02aaa9b9ef 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
# -----------------------------------------------------------------------------
# Console (use 'console')
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 3a252f8cfe9ec..0f800c992969b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -657,7 +657,7 @@ class JobManagerITCase(_system: ActorSystem)
jm.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self)
expectMsg(JobSubmitSuccess(jobGraph2.getJobID))
- // job stil running
+ // job still running
jm.tell(RemoveCachedJob(jobGraph2.getJobID), self)
expectMsgType[JobResultSuccess]
diff --git a/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
new file mode 100644
index 0000000000000..c0a2adc5c8c9d
--- /dev/null
+++ b/flink-shaded-curator/flink-shaded-curator-recipes/pom.xml
@@ -0,0 +1,78 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-shaded-curator
+ 0.10-SNAPSHOT
+ ..
+
+
+ flink-shaded-curator-recipes
+ flink-shaded-curator-recipes
+
+ jar
+
+
+
+
+ org.apache.curator
+ curator-recipes
+ ${curator.version}
+
+
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-flink
+ package
+
+ shade
+
+
+
+
+ log4j
+ org.slf4j:slf4j-log4j12
+
+
+
+
+
+
+
+
+
diff --git a/flink-shaded-curator/flink-shaded-curator-test/pom.xml b/flink-shaded-curator/flink-shaded-curator-test/pom.xml
new file mode 100644
index 0000000000000..2700c0c3fa6ba
--- /dev/null
+++ b/flink-shaded-curator/flink-shaded-curator-test/pom.xml
@@ -0,0 +1,86 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-shaded-curator
+ 0.10-SNAPSHOT
+ ..
+
+
+ flink-shaded-curator-test
+ flink-shaded-curator-test
+
+ jar
+
+
+
+
+ org.apache.curator
+ curator-test
+ ${curator.version}
+
+
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-flink
+ package
+
+ shade
+
+
+
+
+ log4j
+ org.slf4j:slf4j-log4j12
+
+
+ org.apache.curator:curator-test
+
+
+
+
+ org.apache.curator
+ org.apache.flink.shaded.org.apache.curator
+
+
+
+
+
+
+
+
+
diff --git a/flink-shaded-curator/pom.xml b/flink-shaded-curator/pom.xml
index ac62cc8c97b14..29d646184007f 100644
--- a/flink-shaded-curator/pom.xml
+++ b/flink-shaded-curator/pom.xml
@@ -1,22 +1,21 @@
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
@@ -29,50 +28,13 @@ under the License.
..
+
+ flink-shaded-curator-recipes
+ flink-shaded-curator-test
+
+
flink-shaded-curator
flink-shaded-curator
- jar
-
-
-
-
- org.apache.curator
- curator-recipes
- ${curator.version}
-
-
-
-
- com.google.guava
- guava
- ${guava.version}
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
-
-
- shade-flink
- package
-
- shade
-
-
-
-
- log4j
- org.slf4j:slf4j-log4j12
-
-
-
-
-
-
-
-
+ pom
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 0dd20b1533a33..b9bae6ff918b4 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -147,6 +147,13 @@ under the License.
test
+
+ org.apache.flink
+ flink-shaded-curator-test
+ ${project.version}
+ test
+
+
org.scalatest
scalatest_${scala.binary.version}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index a0c831238c474..2cdf83c20e119 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -523,22 +523,19 @@ private void submitJobGraph(
}
private void checkCleanRecoveryState(Configuration config) throws Exception {
- LOG.info("Checking " + ZooKeeper.getClient().getNamespace() +
+ LOG.info("Checking " + ZooKeeper.getClientNamespace() +
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
- List jobGraphs = ZooKeeper.getClient().getChildren()
- .forPath(ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+ List jobGraphs = ZooKeeper.getChildren(ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
assertEquals("Unclean job graphs: " + jobGraphs, 0, jobGraphs.size());
- LOG.info("Checking " + ZooKeeper.getClient().getNamespace() +
+ LOG.info("Checking " + ZooKeeper.getClientNamespace() +
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
- List checkpoints = ZooKeeper.getClient().getChildren()
- .forPath(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
+ List checkpoints = ZooKeeper.getChildren(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
assertEquals("Unclean checkpoints: " + checkpoints, 0, checkpoints.size());
- LOG.info("Checking " + ZooKeeper.getClient().getNamespace() +
+ LOG.info("Checking " + ZooKeeper.getClientNamespace() +
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
- List checkpointCounter = ZooKeeper.getClient().getChildren()
- .forPath(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+ List checkpointCounter = ZooKeeper.getChildren(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
assertEquals("Unclean checkpoint counter: " + checkpointCounter, 0, checkpointCounter.size());
LOG.info("ZooKeeper state is clean");
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 94d0a81a75b38..a05621a985d5e 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -22,6 +22,7 @@
import akka.actor.PoisonPill;
import akka.testkit.JavaTestKit;
import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -37,7 +38,9 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
@@ -53,6 +56,9 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
private static final int numberApplicationAttempts = 10;
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
@BeforeClass
public static void setup() {
actorSystem = AkkaUtils.createDefaultActorSystem();
@@ -102,9 +108,14 @@ public void testMultipleAMKill() throws Exception {
String confDirPath = System.getenv("FLINK_CONF_DIR");
flinkYarnClient.setConfigurationDirectory(confDirPath);
+ String fsStateHandlePath = tmp.getRoot().getPath();
+
flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" +
- zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts);
+ zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
+ "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
+ "@@" + ConfigConstants.STATE_BACKEND_FS_DIR + "=" + fsStateHandlePath + "/checkpoints" +
+ "@@" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
AbstractFlinkYarnCluster yarnCluster = null;
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 83d1f3c2fa54d..fa7003953ac19 100644
--- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -21,8 +21,10 @@ package org.apache.flink.yarn
import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
@@ -60,7 +62,9 @@ class TestingYarnJobManager(
delayBetweenRetries: Long,
timeout: FiniteDuration,
mode: StreamingMode,
- leaderElectionService: LeaderElectionService)
+ leaderElectionService: LeaderElectionService,
+ submittedJobGraphs : SubmittedJobGraphStore,
+ checkpointRecoveryFactory : CheckpointRecoveryFactory)
extends YarnJobManager(
flinkConfiguration,
executionContext,
@@ -72,7 +76,9 @@ class TestingYarnJobManager(
delayBetweenRetries,
timeout,
mode,
- leaderElectionService)
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory)
with TestingJobManagerLike {
override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner]