Skip to content

Commit

Permalink
[FLINK-5232] Introduce RobustActorSystem
Browse files Browse the repository at this point in the history
The RobustActorSystem has a configurable UncaughtExceptionHandler. Per default it is started
with the FatalExitExceptionHandler. This handler terminates the JVM whenever it sees an
uncaught exception.

This closes apache#6334.
  • Loading branch information
tillrohrmann committed Aug 14, 2018
1 parent fb4eb93 commit 6fdeb5d
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 10 deletions.
98 changes: 98 additions & 0 deletions flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package akka.actor

import java.lang.Thread.UncaughtExceptionHandler

import akka.actor.ActorSystem.findClassLoader
import akka.actor.setup.ActorSystemSetup
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.flink.runtime.util.FatalExitExceptionHandler

import scala.concurrent.ExecutionContext

/**
* [[ActorSystemImpl]] which has a configurable [[java.lang.Thread.UncaughtExceptionHandler]].
*/
class RobustActorSystem(
name: String,
applicationConfig: Config,
classLoader: ClassLoader,
defaultExecutionContext: Option[ExecutionContext],
guardianProps: Option[Props],
setup: ActorSystemSetup,
val optionalUncaughtExceptionHandler: Option[UncaughtExceptionHandler])
extends ActorSystemImpl(
name,
applicationConfig,
classLoader,
defaultExecutionContext,
guardianProps,
setup) {

override protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
optionalUncaughtExceptionHandler.getOrElse(super.uncaughtExceptionHandler)
}

object RobustActorSystem {
def create(name: String, applicationConfig: Config): RobustActorSystem = {
apply(name, ActorSystemSetup.create(BootstrapSetup(None, Option(applicationConfig), None)))
}

def create(
name: String,
applicationConfig: Config,
uncaughtExceptionHandler: UncaughtExceptionHandler): RobustActorSystem = {
apply(
name,
ActorSystemSetup.create(BootstrapSetup(None, Option(applicationConfig), None)),
uncaughtExceptionHandler
)
}

def apply(name: String, setup: ActorSystemSetup): RobustActorSystem = {
internalApply(name, setup, Some(FatalExitExceptionHandler.INSTANCE))
}

def apply(
name: String,
setup: ActorSystemSetup,
uncaughtExceptionHandler: UncaughtExceptionHandler): RobustActorSystem = {
internalApply(name, setup, Some(uncaughtExceptionHandler))
}

def internalApply(
name: String,
setup: ActorSystemSetup,
uncaughtExceptionHandler: Option[UncaughtExceptionHandler]): RobustActorSystem = {
val bootstrapSettings = setup.get[BootstrapSetup]
val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)

new RobustActorSystem(
name,
appConfig,
cl,
defaultEC,
None,
setup,
uncaughtExceptionHandler).start()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object AkkaUtils {
def createActorSystem(akkaConfig: Config): ActorSystem = {
// Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650)
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory)
new RobustActorSystem("flink", akkaConfig)
RobustActorSystem.create("flink", akkaConfig)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand All @@ -41,13 +42,13 @@
/**
* Tests for {@link FlinkUntypedActor}.
*/
public class FlinkUntypedActorTest {
public class FlinkUntypedActorTest extends TestLogger {

private static ActorSystem actorSystem;

@BeforeClass
public static void setup() {
actorSystem = new RobustActorSystem("TestingActorSystem", TestingUtils.testConfig());
actorSystem = RobustActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.TestLogger;

import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -48,15 +49,15 @@
/**
* Tests for {@link org.apache.flink.runtime.instance.InstanceManager}.
*/
public class InstanceManagerTest{
public class InstanceManagerTest extends TestLogger {

static ActorSystem system;

static UUID leaderSessionID = UUID.randomUUID();

@BeforeClass
public static void setup(){
system = new RobustActorSystem("TestingActorSystem", TestingUtils.testConfig());
system = RobustActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
Expand Down Expand Up @@ -79,7 +77,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {

@BeforeClass
public static void setup() throws Exception {
actorSystem = new RobustActorSystem("TestingActorSystem", TestingUtils.getDefaultTestingActorSystemConfig());
actorSystem = RobustActorSystem.create("TestingActorSystem", TestingUtils.getDefaultTestingActorSystemConfig());
testingServer = new TestingServer();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package akka.actor

import java.lang.Thread.UncaughtExceptionHandler

import org.apache.flink.runtime.akka.AkkaUtils
import org.junit.{After, Before, Test}
import org.scalatest.Matchers
import org.scalatest.junit.JUnitSuite

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise}
import scala.util.Success

class RobustActorSystemTest extends JUnitSuite with Matchers {

var robustActorSystem: RobustActorSystem = null
var testingUncaughtExceptionHandler: TestingUncaughtExceptionHandler = null

@Before
def setup(): Unit = {
testingUncaughtExceptionHandler = new TestingUncaughtExceptionHandler
robustActorSystem = RobustActorSystem.create(
"testSystem",
AkkaUtils.testDispatcherConfig,
testingUncaughtExceptionHandler)
}

@After
def teardown(): Unit = {
robustActorSystem.terminate()
testingUncaughtExceptionHandler = null;
}

@Test
def testUncaughtExceptionHandler(): Unit = {
val error = new UnknownError("Foobar")

Future {
throw error
}(robustActorSystem.dispatcher)

val caughtException = Await.result(
testingUncaughtExceptionHandler.exceptionPromise.future,
Duration.Inf)

caughtException should equal (error)
}

@Test
def testUncaughtExceptionHandlerFromActor(): Unit = {
val error = new UnknownError()
val actor = robustActorSystem.actorOf(Props.create(classOf[UncaughtExceptionActor], error))

actor ! Failure

val caughtException = Await.result(
testingUncaughtExceptionHandler.exceptionPromise.future,
Duration.Inf)

caughtException should equal (error)
}
}

class TestingUncaughtExceptionHandler extends UncaughtExceptionHandler {
val exceptionPromise: Promise[Throwable] = Promise()

override def uncaughtException(t: Thread, e: Throwable): Unit = {
exceptionPromise.complete(Success(e))
}
}

class UncaughtExceptionActor(failure: Throwable) extends Actor {
override def receive: Receive = {
case Failure => {
throw failure
};
}
}

case object Failure
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ private void testCheckpointedStreamingProgram(AbstractStateBackend stateBackend)
final int sequenceEnd = 5000;
final long expectedSum = Parallelism * sequenceEnd * (sequenceEnd + 1) / 2;

final ActorSystem system = new RobustActorSystem("Test", AkkaUtils.getDefaultAkkaConfig());
final ActorSystem system = RobustActorSystem.create("Test", AkkaUtils.getDefaultAkkaConfig());
final TestingServer testingServer = new TestingServer();
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger {
@Test
public void testLocalFlinkMiniClusterWithMultipleTaskManagers() throws InterruptedException, TimeoutException {

final ActorSystem system = new RobustActorSystem("Testkit", AkkaUtils.getDefaultAkkaConfig());
final ActorSystem system = RobustActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig());
LocalFlinkMiniCluster miniCluster = null;

final int numTMs = 3;
Expand Down

0 comments on commit 6fdeb5d

Please sign in to comment.