Skip to content

Commit

Permalink
[FLINK-3363] [jobmanager] Properly shut down executor thread pool whe…
Browse files Browse the repository at this point in the history
…n JobManager shuts down
  • Loading branch information
StephanEwen committed Feb 8, 2016
1 parent af3e689 commit a277543
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager
import java.io.{File, IOException}
import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress}
import java.util.UUID
import java.util.concurrent.ExecutorService

import akka.actor.Status.Failure
import akka.actor._
Expand Down Expand Up @@ -90,7 +91,7 @@ import scala.language.postfixOps
* is indicated by [[CancellationSuccess]] and a failure by [[CancellationFailure]]
*
* - [[UpdateTaskExecutionState]] is sent by a TaskManager to update the state of an
ExecutionVertex contained in the [[ExecutionGraph]].
* ExecutionVertex contained in the [[ExecutionGraph]].
* A successful update is acknowledged by true and otherwise false.
*
* - [[RequestNextInputSplit]] requests the next input split for a running task on a
Expand All @@ -102,7 +103,7 @@ import scala.language.postfixOps
*/
class JobManager(
protected val flinkConfiguration: Configuration,
protected val executionContext: ExecutionContext,
protected val executorService: ExecutorService,
protected val instanceManager: InstanceManager,
protected val scheduler: FlinkScheduler,
protected val libraryCacheManager: BlobLibraryCacheManager,
Expand All @@ -121,6 +122,15 @@ class JobManager(

override val log = Logger(getClass)

/** The extra execution context, for futures, with a custom logging reporter */
protected val executionContext: ExecutionContext = ExecutionContext.fromExecutor(
executorService,
(t: Throwable) => {
if (!context.system.isTerminated) {
log.error("Executor could not execute task", t)
}
})

/** Either running or not yet archived jobs (session hasn't been ended). */
protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()

Expand Down Expand Up @@ -246,6 +256,9 @@ class JobManager(
case e: IOException => log.error("Could not properly shutdown the library cache manager.", e)
}

// shut down the extra thread pool for futures
executorService.shutdown()

log.debug(s"Job manager ${self.path} is completely stopped.")
}

Expand Down Expand Up @@ -1503,7 +1516,8 @@ class JobManager(

/**
* Updates the accumulators reported from a task manager via the Heartbeat message.
* @param accumulators list of accumulator snapshots
*
* @param accumulators list of accumulator snapshots
*/
private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
accumulators foreach {
Expand Down Expand Up @@ -2016,7 +2030,7 @@ object JobManager {
def createJobManagerComponents(
configuration: Configuration,
leaderElectionServiceOption: Option[LeaderElectionService]) :
(ExecutionContext,
(ExecutorService,
InstanceManager,
FlinkScheduler,
BlobLibraryCacheManager,
Expand Down Expand Up @@ -2064,17 +2078,19 @@ object JobManager {
}
}

val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())


var blobServer: BlobServer = null
var instanceManager: InstanceManager = null
var scheduler: FlinkScheduler = null
var libraryCacheManager: BlobLibraryCacheManager = null

val executorService: ExecutorService = new ForkJoinPool()

try {
blobServer = new BlobServer(configuration)
instanceManager = new InstanceManager()
scheduler = new FlinkScheduler(executionContext)
scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executorService))
libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)

instanceManager.addInstanceListener(scheduler)
Expand All @@ -2093,6 +2109,8 @@ object JobManager {
if (blobServer != null) {
blobServer.shutdown()
}
executorService.shutdownNow()

throw t
}

Expand Down Expand Up @@ -2122,7 +2140,7 @@ object JobManager {
new ZooKeeperCheckpointRecoveryFactory(client, configuration))
}

(executionContext,
(executorService,
instanceManager,
scheduler,
libraryCacheManager,
Expand All @@ -2143,8 +2161,7 @@ object JobManager {
* @param actorSystem The actor system running the JobManager
* @param jobManagerClass The class of the JobManager to be started
* @param archiveClass The class of the MemoryArchivist to be started
*
* @return A tuple of references (JobManager Ref, Archiver Ref)
* @return A tuple of references (JobManager Ref, Archiver Ref)
*/
def startJobManagerActors(
configuration: Configuration,
Expand Down Expand Up @@ -2174,8 +2191,7 @@ object JobManager {
* the actor will have the name generated by the actor system.
* @param jobManagerClass The class of the JobManager to be started
* @param archiveClass The class of the MemoryArchivist to be started
*
* @return A tuple of references (JobManager Ref, Archiver Ref)
* @return A tuple of references (JobManager Ref, Archiver Ref)
*/
def startJobManagerActors(
configuration: Configuration,
Expand All @@ -2186,7 +2202,7 @@ object JobManager {
archiveClass: Class[_ <: MemoryArchivist])
: (ActorRef, ActorRef) = {

val (executionContext,
val (executorService: ExecutorService,
instanceManager,
scheduler,
libraryCacheManager,
Expand All @@ -2211,7 +2227,7 @@ object JobManager {
val jobManagerProps = Props(
jobManagerClass,
configuration,
executionContext,
executorService,
instanceManager,
scheduler,
libraryCacheManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory

import scala.concurrent.duration.FiniteDuration
import scala.concurrent._
import scala.concurrent.forkjoin.ForkJoinPool

/**
* Abstract base class for Flink's mini cluster. The mini cluster starts a
Expand Down Expand Up @@ -82,7 +83,7 @@ abstract class FlinkMiniCluster(

/** Future lock */
val futureLock = new Object()

implicit val executionContext = ExecutionContext.global

implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
Expand Down Expand Up @@ -320,8 +321,6 @@ abstract class FlinkMiniCluster(
_.map(gracefulStop(_, timeout))
} getOrElse(Seq())

implicit val executionContext = ExecutionContext.global

Await.ready(Future.sequence(jmFutures ++ tmFutures), timeout)

if (!useSingleActorSystem) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.forkjoin.ForkJoinPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class JobManagerLeaderElectionTest extends TestLogger {
Expand All @@ -62,14 +64,16 @@ public class JobManagerLeaderElectionTest extends TestLogger {

private static ActorSystem actorSystem;
private static TestingServer testingServer;
private static ExecutorService executor;

private static Timeout timeout = new Timeout(TestingUtils.TESTING_DURATION());
private static FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);

@BeforeClass
public static void setup() throws Exception {
actorSystem = ActorSystem.create("TestingActorSystem");

testingServer = new TestingServer();
executor = new ForkJoinPool();
}

@AfterClass
Expand All @@ -78,9 +82,13 @@ public static void teardown() throws Exception {
JavaTestKit.shutdownActorSystem(actorSystem);
}

if(testingServer != null) {
if (testingServer != null) {
testingServer.stop();
}

if (executor != null) {
executor.shutdownNow();
}
}

/**
Expand Down Expand Up @@ -175,10 +183,10 @@ private Props createJobManagerProps(Configuration configuration) throws Exceptio
return Props.create(
TestingJobManager.class,
configuration,
TestingUtils.defaultExecutionContext(),
executor,
new InstanceManager(),
new Scheduler(TestingUtils.defaultExecutionContext()),
new BlobLibraryCacheManager(new BlobServer(configuration), 10l),
new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
ActorRef.noSender(),
1,
1L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.testingUtils

import akka.actor.ActorRef

import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
Expand All @@ -27,25 +28,17 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.postfixOps

import java.util.concurrent.ExecutorService

/** JobManager implementation extended by testing messages
*
* @param flinkConfiguration
* @param executionContext
* @param instanceManager
* @param scheduler
* @param libraryCacheManager
* @param archive
* @param defaultExecutionRetries
* @param delayBetweenRetries
* @param timeout
*/
class TestingJobManager(
flinkConfiguration: Configuration,
executionContext: ExecutionContext,
executorService: ExecutorService,
instanceManager: InstanceManager,
scheduler: Scheduler,
libraryCacheManager: BlobLibraryCacheManager,
Expand All @@ -58,7 +51,7 @@ class TestingJobManager(
checkpointRecoveryFactory : CheckpointRecoveryFactory)
extends JobManager(
flinkConfiguration,
executionContext,
executorService,
instanceManager,
scheduler,
libraryCacheManager,
Expand Down
Loading

0 comments on commit a277543

Please sign in to comment.