org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test-jar
test
diff --git a/flink-rpc/flink-rpc-akka-loader/pom.xml b/flink-rpc/flink-rpc-akka-loader/pom.xml
new file mode 100644
index 0000000000000..304826d845698
--- /dev/null
+++ b/flink-rpc/flink-rpc-akka-loader/pom.xml
@@ -0,0 +1,127 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-rpc
+ 1.14-SNAPSHOT
+ ..
+
+
+ flink-rpc-akka-loader
+ Flink : RPC : Akka-Loader
+ jar
+ This module contains the mechanism for loading flink-rpc-akka through a separate classloader.
+
+
+
+ org.apache.flink
+ flink-core
+ ${project.version}
+
+
+ org.apache.flink
+ flink-rpc-core
+ ${project.version}
+
+
+
+
+ org.apache.flink
+ flink-rpc-akka
+ ${project.version}
+
+ runtime
+
+ true
+
+
+
+ *
+ *
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-rpc-akka-jars
+ process-resources
+
+ copy
+
+
+
+
+ org.apache.flink
+ flink-rpc-akka
+ ${project.version}
+ jar
+ true
+ flink-rpc-akka.jar
+
+
+ ${project.build.directory}/classes
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-flink
+ package
+
+ shade
+
+
+
+
+ org.apache.flink:flink-rpc-akka
+
+
+
+
+ org.apache.flink:flink-rpc-akka
+
+ META-INF/NOTICE
+ META-INF/licenses/**
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
new file mode 100644
index 0000000000000..0a3d0b8a1a510
--- /dev/null
+++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.core.classloading.SubmoduleClassLoader;
+import org.apache.flink.runtime.rpc.RpcSystem;
+import org.apache.flink.runtime.rpc.RpcSystemLoader;
+import org.apache.flink.util.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ServiceLoader;
+import java.util.UUID;
+
+/**
+ * Loader for the {@link AkkaRpcSystemLoader}.
+ *
+ * This loader expects the flink-rpc-akka jar to be accessible via {@link
+ * ClassLoader#getResource(String)}. It will extract the jar into a temporary directory and create a
+ * new {@link SubmoduleClassLoader} to load the rpc system from that jar.
+ */
+public class AkkaRpcSystemLoader implements RpcSystemLoader {
+
+ @Override
+ public RpcSystem loadRpcSystem(Configuration config) {
+ try {
+ final ClassLoader flinkClassLoader = RpcSystem.class.getClassLoader();
+
+ final String tmpDirectory = ConfigurationUtils.parseTempDirectories(config)[0];
+ final Path tempFile =
+ Files.createFile(
+ Paths.get(
+ tmpDirectory, "_flink-rpc-akka_" + UUID.randomUUID() + ".jar"));
+
+ final InputStream resourceStream =
+ flinkClassLoader.getResourceAsStream("flink-rpc-akka.jar");
+ if (resourceStream == null) {
+ throw new RuntimeException(
+ "Akka RPC system could not be found. If this happened while running a test in the IDE,"
+ + "run the process-resources phase on flink-rpc/flink-rpc-akka-loader via maven.");
+ }
+
+ IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
+
+ final SubmoduleClassLoader submoduleClassLoader =
+ new SubmoduleClassLoader(
+ new URL[] {tempFile.toUri().toURL()}, flinkClassLoader);
+
+ return new CleanupOnCloseRpcSystem(
+ ServiceLoader.load(RpcSystem.class, submoduleClassLoader).iterator().next(),
+ submoduleClassLoader,
+ tempFile);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not initialize RPC system.", e);
+ }
+ }
+}
diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java
new file mode 100644
index 0000000000000..621047bdd1bca
--- /dev/null
+++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.classloading.SubmoduleClassLoader;
+import org.apache.flink.runtime.rpc.AddressResolution;
+import org.apache.flink.runtime.rpc.RpcSystem;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+/** An {@link RpcSystem} wrapper that cleans up resources after the RPC system has been closed. */
+class CleanupOnCloseRpcSystem implements RpcSystem {
+ private static final Logger LOG = LoggerFactory.getLogger(CleanupOnCloseRpcSystem.class);
+
+ private final RpcSystem rpcSystem;
+ private final SubmoduleClassLoader pluginLoader;
+ private final Path tempFile;
+
+ public CleanupOnCloseRpcSystem(
+ RpcSystem rpcSystem, SubmoduleClassLoader pluginLoader, Path tempFile) {
+ this.rpcSystem = Preconditions.checkNotNull(rpcSystem);
+ this.pluginLoader = Preconditions.checkNotNull(pluginLoader);
+ this.tempFile = Preconditions.checkNotNull(tempFile);
+ }
+
+ @Override
+ public void close() {
+ rpcSystem.close();
+
+ try {
+ pluginLoader.close();
+ } catch (Exception e) {
+ LOG.warn("Could not close RpcSystem classloader.", e);
+ }
+ try {
+ Files.delete(tempFile);
+ } catch (Exception e) {
+ LOG.warn("Could not delete temporary rpc system file {}.", tempFile, e);
+ }
+ }
+
+ @Override
+ public RpcServiceBuilder localServiceBuilder(Configuration config) {
+ return rpcSystem.localServiceBuilder(config);
+ }
+
+ @Override
+ public RpcServiceBuilder remoteServiceBuilder(
+ Configuration configuration,
+ @Nullable String externalAddress,
+ String externalPortRange) {
+ return rpcSystem.remoteServiceBuilder(configuration, externalAddress, externalPortRange);
+ }
+
+ @Override
+ public String getRpcUrl(
+ String hostname,
+ int port,
+ String endpointName,
+ AddressResolution addressResolution,
+ Configuration config)
+ throws UnknownHostException {
+ return rpcSystem.getRpcUrl(hostname, port, endpointName, addressResolution, config);
+ }
+
+ @Override
+ public InetSocketAddress getInetSocketAddressFromRpcUrl(String url) throws Exception {
+ return rpcSystem.getInetSocketAddressFromRpcUrl(url);
+ }
+
+ @Override
+ public long getMaximumMessageSizeInBytes(Configuration config) {
+ return rpcSystem.getMaximumMessageSizeInBytes(config);
+ }
+}
diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader b/flink-rpc/flink-rpc-akka-loader/src/main/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader
new file mode 100644
index 0000000000000..8387fa4c69658
--- /dev/null
+++ b/flink-rpc/flink-rpc-akka-loader/src/main/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.rpc.akka.AkkaRpcSystemLoader
diff --git a/flink-rpc/flink-rpc-akka/pom.xml b/flink-rpc/flink-rpc-akka/pom.xml
index d7eb9b3545a55..a4cc211759c91 100644
--- a/flink-rpc/flink-rpc-akka/pom.xml
+++ b/flink-rpc/flink-rpc-akka/pom.xml
@@ -29,10 +29,16 @@ under the License.
..
- flink-rpc-akka_${scala.binary.version}
+ flink-rpc-akka
Flink : RPC : Akka
jar
+
+ 2.6.15
+ 2.12
+ 2.12.7
+
+
org.apache.flink
@@ -53,102 +59,91 @@ under the License.
provided
+
+ org.scala-lang
+ scala-reflect
+ compile
+
org.scala-lang
scala-library
+ compile
+
+
+ org.scala-lang
+ scala-compiler
+ compile
com.typesafe.akka
- akka-actor_${scala.binary.version}
-
-
-
- org.scala-lang
- scala-library
-
-
+ akka-actor_${akka.scala.binary.version}
+ ${akka.version}
-
com.typesafe.akka
- akka-remote_${scala.binary.version}
-
+ akka-remote_${akka.scala.binary.version}
+ ${akka.version}
- org.scala-lang
- scala-library
+
+ io.aeron
+ aeron-driver
+
+
+
+ io.aeron
+ aeron-client
-
-
+
+ com.typesafe.akka
+ akka-slf4j_${akka.scala.binary.version}
+ ${akka.version}
+
io.netty
netty
3.10.6.Final
-
- com.typesafe.akka
- akka-stream_${scala.binary.version}
-
-
-
- org.scala-lang
- scala-library
-
-
- com.typesafe
- config
-
-
+ org.clapper
+ grizzled-slf4j_${akka.scala.binary.version}
+ 1.3.2
-
- com.typesafe.akka
- akka-protobuf_${scala.binary.version}
-
-
-
- org.scala-lang
- scala-library
-
-
+ org.slf4j
+ slf4j-api
+ provided
- com.typesafe.akka
- akka-slf4j_${scala.binary.version}
-
-
-
- org.scala-lang
- scala-library
-
-
+ com.google.code.findbugs
+ jsr305
+ provided
- org.clapper
- grizzled-slf4j_${scala.binary.version}
-
-
-
- org.scala-lang
- scala-library
-
-
+ org.apache.flink
+ flink-test-utils-junit
org.apache.flink
- flink-test-utils-junit
+ flink-core
+ ${project.version}
+ test
+ test-jar
+
+
+
+ org.scalatest
+ scalatest_${akka.scala.binary.version}
+ 3.0.0
+ test
@@ -167,10 +162,30 @@ under the License.
+
+ org.scala-lang
+ scala-compiler
+ ${akka.scala.version}
+
+
+ org.scala-lang
+ scala-library
+ ${akka.scala.version}
+
+
+ org.scala-lang
+ scala-reflect
+ ${akka.scala.version}
+
+
+ org.scala-lang.modules
+ scala-xml_${akka.scala.binary.version}
+ 1.0.6
+
com.typesafe
config
- 1.3.0
+ 1.4.0
@@ -201,20 +216,10 @@ under the License.
-
-
-
- com.typesafe.akka:akka-remote_*
- io.netty:netty
+
+ *
-
-
- org.jboss.netty
- org.apache.flink.shaded.akka.org.jboss.netty
-
-
io.netty:netty
@@ -228,11 +233,31 @@ under the License.
+
+
+ reference.conf
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+
+
+
+ enforce-versions
+
+ enforce
+
+ none
+
+
+
+
net.alchim31.maven
@@ -259,6 +284,7 @@ under the License.
+ ${akka.scala.version}
-Xms128m
-Xmx512m
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java
index 8c69e14911745..3514704710349 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java
@@ -35,13 +35,19 @@
import scala.concurrent.duration.FiniteDuration;
-/** Adapter to use a {@link ActorSystem} as a {@link ScheduledExecutor}. */
+/**
+ * Adapter to use a {@link ActorSystem} as a {@link ScheduledExecutor}. Furthermore ensures that the
+ * context class loader is set to the Flink class loader while the runnable is running.
+ */
public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {
private final ActorSystem actorSystem;
+ private final ClassLoader flinkClassLoader;
- public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) {
+ public ActorSystemScheduledExecutorAdapter(
+ ActorSystem actorSystem, ClassLoader flinkClassLoader) {
this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService");
+ this.flinkClassLoader = Preconditions.checkNotNull(flinkClassLoader, "flinkClassLoader");
}
@Override
@@ -86,7 +92,8 @@ public ScheduledFuture> scheduleAtFixedRate(
.schedule(
new FiniteDuration(initialDelay, unit),
new FiniteDuration(period, unit),
- scheduledFutureTask,
+ ClassLoadingUtils.withContextClassLoader(
+ scheduledFutureTask, flinkClassLoader),
actorSystem.dispatcher());
scheduledFutureTask.setCancellable(cancellable);
@@ -111,13 +118,18 @@ public ScheduledFuture> scheduleWithFixedDelay(
@Override
public void execute(@Nonnull Runnable command) {
- actorSystem.dispatcher().execute(command);
+ actorSystem
+ .dispatcher()
+ .execute(ClassLoadingUtils.withContextClassLoader(command, flinkClassLoader));
}
private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
return actorSystem
.scheduler()
- .scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher());
+ .scheduleOnce(
+ new FiniteDuration(delay, unit),
+ ClassLoadingUtils.withContextClassLoader(runnable, flinkClassLoader),
+ actorSystem.dispatcher());
}
private long now() {
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtils.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtils.java
new file mode 100644
index 0000000000000..623c99f89ea3e
--- /dev/null
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent.akka;
+
+import org.apache.flink.util.TemporaryClassLoaderContext;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/** Classloading utilities. */
+public class ClassLoadingUtils {
+
+ /**
+ * Wraps the given runnable in a {@link TemporaryClassLoaderContext} to prevent the plugin class
+ * loader from leaking into Flink.
+ *
+ * @param runnable runnable to wrap
+ * @param contextClassLoader class loader that should be set as the context class loader
+ * @return wrapped runnable
+ */
+ public static Runnable withContextClassLoader(
+ Runnable runnable, ClassLoader contextClassLoader) {
+ return () -> runWithContextClassLoader(runnable::run, contextClassLoader);
+ }
+
+ /**
+ * Wraps the given executor such that all submitted are runnables are run in a {@link
+ * TemporaryClassLoaderContext} based on the given classloader.
+ *
+ * @param executor executor to wrap
+ * @param contextClassLoader class loader that should be set as the context class loader
+ * @return wrapped executor
+ */
+ public static Executor withContextClassLoader(
+ Executor executor, ClassLoader contextClassLoader) {
+ return new ContextClassLoaderSettingExecutor(executor, contextClassLoader);
+ }
+
+ /**
+ * Runs the given runnable in a {@link TemporaryClassLoaderContext} to prevent the plugin class
+ * loader from leaking into Flink.
+ *
+ * @param runnable runnable to run
+ * @param contextClassLoader class loader that should be set as the context class loader
+ */
+ public static void runWithContextClassLoader(
+ ThrowingRunnable runnable, ClassLoader contextClassLoader) throws T {
+ try (TemporaryClassLoaderContext ignored =
+ TemporaryClassLoaderContext.of(contextClassLoader)) {
+ runnable.run();
+ }
+ }
+
+ /**
+ * Runs the given supplier in a {@link TemporaryClassLoaderContext} based on the given
+ * classloader.
+ *
+ * @param supplier supplier to run
+ * @param contextClassLoader class loader that should be set as the context class loader
+ */
+ public static T runWithContextClassLoader(
+ SupplierWithException supplier, ClassLoader contextClassLoader) throws E {
+ try (TemporaryClassLoaderContext ignored =
+ TemporaryClassLoaderContext.of(contextClassLoader)) {
+ return supplier.get();
+ }
+ }
+
+ public static CompletableFuture guardCompletionWithContextClassLoader(
+ CompletableFuture future, ClassLoader contextClassLoader) {
+ final CompletableFuture guardedFuture = new CompletableFuture<>();
+ future.whenComplete(
+ (value, throwable) ->
+ runWithContextClassLoader(
+ () -> FutureUtils.doForward(value, throwable, guardedFuture),
+ contextClassLoader));
+ return guardedFuture;
+ }
+
+ /**
+ * An {@link Executor} wrapper that temporarily resets the ContextClassLoader to the given
+ * ClassLoader.
+ */
+ private static class ContextClassLoaderSettingExecutor implements Executor {
+
+ private final Executor backingExecutor;
+ private final ClassLoader contextClassLoader;
+
+ public ContextClassLoaderSettingExecutor(
+ Executor backingExecutor, ClassLoader contextClassLoader) {
+ this.backingExecutor = backingExecutor;
+ this.contextClassLoader = contextClassLoader;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ backingExecutor.execute(
+ ClassLoadingUtils.withContextClassLoader(command, contextClassLoader));
+ }
+ }
+}
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 273d9522f0655..b4fd6ab4e9231 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -55,6 +55,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.guardCompletionWithContextClassLoader;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -77,6 +78,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
private final ActorRef rpcEndpoint;
+ private final ClassLoader flinkClassLoader;
+
// whether the actor ref is local and thus no message serialization is needed
protected final boolean isLocal;
@@ -97,11 +100,13 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
Time timeout,
long maximumFramesize,
@Nullable CompletableFuture terminationFuture,
- boolean captureAskCallStack) {
+ boolean captureAskCallStack,
+ ClassLoader flinkClassLoader) {
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
+ this.flinkClassLoader = Preconditions.checkNotNull(flinkClassLoader);
this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
this.timeout = Preconditions.checkNotNull(timeout);
this.maximumFramesize = maximumFramesize;
@@ -383,7 +388,10 @@ protected void tell(Object message) {
* @return Response future
*/
protected CompletableFuture> ask(Object message, Time timeout) {
- return AkkaFutureUtils.toJava(Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()));
+ final CompletableFuture> response =
+ AkkaFutureUtils.toJava(
+ Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()));
+ return guardCompletionWithContextClassLoader(response, flinkClassLoader);
}
@Override
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 588746199e9d5..d86d7a464f5e7 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -59,6 +59,7 @@
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;
+import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -86,6 +87,8 @@ class AkkaRpcActor extends AbstractActor {
/** the endpoint to invoke the methods on. */
protected final T rpcEndpoint;
+ private final ClassLoader flinkClassLoader;
+
/** the helper that tracks whether calls come from the main thread. */
private final MainThreadValidatorUtil mainThreadValidator;
@@ -105,10 +108,12 @@ class AkkaRpcActor extends AbstractActor {
final T rpcEndpoint,
final CompletableFuture terminationFuture,
final int version,
- final long maximumFramesize) {
+ final long maximumFramesize,
+ final ClassLoader flinkClassLoader) {
checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
+ this.flinkClassLoader = checkNotNull(flinkClassLoader);
this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
this.terminationFuture = checkNotNull(terminationFuture);
this.version = version;
@@ -177,13 +182,13 @@ private void handleControlMessage(ControlMessages controlMessage) {
try {
switch (controlMessage) {
case START:
- state = state.start(this);
+ state = state.start(this, flinkClassLoader);
break;
case STOP:
state = state.stop();
break;
case TERMINATE:
- state = state.terminate(this);
+ state = state.terminate(this, flinkClassLoader);
break;
default:
handleUnknownControlMessage(controlMessage);
@@ -296,13 +301,21 @@ private void handleRpcInvocation(RpcInvocation rpcInvocation) {
// this supports declaration of anonymous classes
rpcMethod.setAccessible(true);
+ final Method capturedRpcMethod = rpcMethod;
if (rpcMethod.getReturnType().equals(Void.TYPE)) {
// No return value to send back
- rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+ runWithContextClassLoader(
+ () -> capturedRpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()),
+ flinkClassLoader);
} else {
final Object result;
try {
- result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+ result =
+ runWithContextClassLoader(
+ () ->
+ capturedRpcMethod.invoke(
+ rpcEndpoint, rpcInvocation.getArgs()),
+ flinkClassLoader);
} catch (InvocationTargetException e) {
log.debug(
"Reporting back error thrown in remote procedure {}", rpcMethod, e);
@@ -416,7 +429,9 @@ private Either serializeRemoteResultAn
*/
private void handleCallAsync(CallAsync callAsync) {
try {
- Object result = callAsync.getCallable().call();
+ Object result =
+ runWithContextClassLoader(
+ () -> callAsync.getCallable().call(), flinkClassLoader);
getSender().tell(new Status.Success(result), getSelf());
} catch (Throwable e) {
@@ -437,7 +452,7 @@ private void handleRunAsync(RunAsync runAsync) {
if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) {
// run immediately
try {
- runAsync.getRunnable().run();
+ runWithContextClassLoader(() -> runAsync.getRunnable().run(), flinkClassLoader);
} catch (Throwable t) {
log.error("Caught exception while executing runnable in main thread.", t);
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
@@ -510,7 +525,7 @@ private void stop(RpcEndpointTerminationResult rpcEndpointTerminationResult) {
// ---------------------------------------------------------------------------
interface State {
- default State start(AkkaRpcActor> akkaRpcActor) {
+ default State start(AkkaRpcActor> akkaRpcActor, ClassLoader flinkClassLoader) {
throw new AkkaRpcInvalidStateException(
invalidStateTransitionMessage(StartedState.STARTED));
}
@@ -520,7 +535,7 @@ default State stop() {
invalidStateTransitionMessage(StoppedState.STOPPED));
}
- default State terminate(AkkaRpcActor> akkaRpcActor) {
+ default State terminate(AkkaRpcActor> akkaRpcActor, ClassLoader flinkClassLoader) {
throw new AkkaRpcInvalidStateException(
invalidStateTransitionMessage(TerminatingState.TERMINATING));
}
@@ -545,7 +560,7 @@ enum StartedState implements State {
STARTED;
@Override
- public State start(AkkaRpcActor> akkaRpcActor) {
+ public State start(AkkaRpcActor> akkaRpcActor, ClassLoader flinkClassLoader) {
return STARTED;
}
@@ -555,12 +570,15 @@ public State stop() {
}
@Override
- public State terminate(AkkaRpcActor> akkaRpcActor) {
+ public State terminate(AkkaRpcActor> akkaRpcActor, ClassLoader flinkClassLoader) {
akkaRpcActor.mainThreadValidator.enterMainThread();
CompletableFuture terminationFuture;
try {
- terminationFuture = akkaRpcActor.rpcEndpoint.internalCallOnStop();
+ terminationFuture =
+ runWithContextClassLoader(
+ () -> akkaRpcActor.rpcEndpoint.internalCallOnStop(),
+ flinkClassLoader);
} catch (Throwable t) {
terminationFuture =
FutureUtils.completedExceptionally(
@@ -598,11 +616,12 @@ enum StoppedState implements State {
STOPPED;
@Override
- public State start(AkkaRpcActor> akkaRpcActor) {
+ public State start(AkkaRpcActor> akkaRpcActor, ClassLoader flinkClassLoader) {
akkaRpcActor.mainThreadValidator.enterMainThread();
try {
- akkaRpcActor.rpcEndpoint.internalCallOnStart();
+ runWithContextClassLoader(
+ () -> akkaRpcActor.rpcEndpoint.internalCallOnStart(), flinkClassLoader);
} catch (Throwable throwable) {
akkaRpcActor.stop(
RpcEndpointTerminationResult.failure(
@@ -624,7 +643,7 @@ public State stop() {
}
@Override
- public State terminate(AkkaRpcActor> akkaRpcActor) {
+ public State terminate(AkkaRpcActor> akkaRpcActor, ClassLoader flinkClassLoader) {
akkaRpcActor.stop(RpcEndpointTerminationResult.success());
return TerminatingState.TERMINATING;
@@ -636,7 +655,7 @@ enum TerminatingState implements State {
TERMINATING;
@Override
- public State terminate(AkkaRpcActor> akkaRpcActor) {
+ public State terminate(AkkaRpcActor> akkaRpcActor, ClassLoader flinkClassLoader) {
return TERMINATING;
}
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 4c1a788299767..c2b20fbe20a1a 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -78,6 +78,9 @@
import scala.Option;
import scala.reflect.ClassTag$;
+import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.guardCompletionWithContextClassLoader;
+import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader;
+import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.withContextClassLoader;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -98,6 +101,8 @@ public class AkkaRpcService implements RpcService {
private final ActorSystem actorSystem;
private final AkkaRpcServiceConfiguration configuration;
+ private final ClassLoader flinkClassLoader;
+
@GuardedBy("lock")
private final Map actors = new HashMap<>(4);
@@ -117,8 +122,16 @@ public class AkkaRpcService implements RpcService {
@VisibleForTesting
public AkkaRpcService(
final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {
+ this(actorSystem, configuration, AkkaRpcService.class.getClassLoader());
+ }
+
+ AkkaRpcService(
+ final ActorSystem actorSystem,
+ final AkkaRpcServiceConfiguration configuration,
+ final ClassLoader flinkClassLoader) {
this.actorSystem = checkNotNull(actorSystem, "actor system");
this.configuration = checkNotNull(configuration, "akka rpc service configuration");
+ this.flinkClassLoader = checkNotNull(flinkClassLoader, "flinkClassLoader");
Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
@@ -136,7 +149,14 @@ public AkkaRpcService(
captureAskCallstacks = configuration.captureAskCallStack();
- internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
+ // Akka always sets the threads context class loader to the class loader with which it was
+ // loaded (i.e., the plugin class loader)
+ // we must ensure that the context class loader is set to the Flink class loader when we
+ // call into Flink
+ // otherwise we could leak the plugin class loader or poison the context class loader of
+ // external threads (because they inherit the current threads context class loader)
+ internalScheduledExecutor =
+ new ActorSystemScheduledExecutorAdapter(actorSystem, flinkClassLoader);
terminationFuture = new CompletableFuture<>();
@@ -158,7 +178,9 @@ private Supervisor startSupervisorActor() {
new ExecutorThreadFactory(
"AkkaRpcService-Supervisor-Termination-Future-Executor"));
final ActorRef actorRef =
- SupervisorActor.startSupervisorActor(actorSystem, terminationFutureExecutor);
+ SupervisorActor.startSupervisorActor(
+ actorSystem,
+ withContextClassLoader(terminationFutureExecutor, flinkClassLoader));
return Supervisor.create(actorRef, terminationFutureExecutor);
}
@@ -199,7 +221,8 @@ public CompletableFuture connect(
configuration.getTimeout(),
configuration.getMaximumFramesize(),
null,
- captureAskCallstacks);
+ captureAskCallstacks,
+ flinkClassLoader);
});
}
@@ -221,7 +244,8 @@ public > CompletableFuture
configuration.getMaximumFramesize(),
null,
() -> fencingToken,
- captureAskCallstacks);
+ captureAskCallstacks,
+ flinkClassLoader);
});
}
@@ -268,7 +292,8 @@ public RpcServer startServer(C rpcEndpoint)
configuration.getMaximumFramesize(),
actorTerminationFuture,
((FencedRpcEndpoint>) rpcEndpoint)::getFencingToken,
- captureAskCallstacks);
+ captureAskCallstacks,
+ flinkClassLoader);
implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
@@ -280,7 +305,8 @@ public RpcServer startServer(C rpcEndpoint)
configuration.getTimeout(),
configuration.getMaximumFramesize(),
actorTerminationFuture,
- captureAskCallstacks);
+ captureAskCallstacks,
+ flinkClassLoader);
}
// Rather than using the System ClassLoader directly, we derive the ClassLoader
@@ -322,7 +348,8 @@ SupervisorActor.ActorRegistration registerAkkaRpcActor(C rpcEndpoint) {
rpcEndpoint,
actorTerminationFuture,
getVersion(),
- configuration.getMaximumFramesize()),
+ configuration.getMaximumFramesize(),
+ flinkClassLoader),
rpcEndpoint.getEndpointId());
final SupervisorActor.ActorRegistration actorRegistration =
@@ -354,7 +381,8 @@ public RpcServer fenceRpcServer(RpcServer rpcServer, F
configuration.getMaximumFramesize(),
null,
() -> fencingToken,
- captureAskCallstacks);
+ captureAskCallstacks,
+ flinkClassLoader);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
@@ -424,11 +452,9 @@ public CompletableFuture stopService() {
actorSystemTerminationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
- if (throwable != null) {
- terminationFuture.completeExceptionally(throwable);
- } else {
- terminationFuture.complete(null);
- }
+ runWithContextClassLoader(
+ () -> FutureUtils.doForward(ignored, throwable, terminationFuture),
+ flinkClassLoader);
LOG.info("Stopped Akka RPC service.");
});
@@ -535,27 +561,33 @@ clazz, getVersion()),
HandshakeSuccessMessage
.class))));
- return actorRefFuture.thenCombineAsync(
- handshakeFuture,
- (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
- InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
-
- // Rather than using the System ClassLoader directly, we derive the ClassLoader
- // from this class . That works better in cases where Flink runs embedded and
- // all Flink
- // code is loaded dynamically (for example from an OSGI bundle) through a custom
- // ClassLoader
- ClassLoader classLoader = getClass().getClassLoader();
-
- @SuppressWarnings("unchecked")
- C proxy =
- (C)
- Proxy.newProxyInstance(
- classLoader, new Class>[] {clazz}, invocationHandler);
-
- return proxy;
- },
- actorSystem.dispatcher());
+ final CompletableFuture gatewayFuture =
+ actorRefFuture.thenCombineAsync(
+ handshakeFuture,
+ (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
+ InvocationHandler invocationHandler =
+ invocationHandlerFactory.apply(actorRef);
+
+ // Rather than using the System ClassLoader directly, we derive the
+ // ClassLoader from this class.
+ // That works better in cases where Flink runs embedded and
+ // all Flink code is loaded dynamically
+ // (for example from an OSGI bundle) through a custom ClassLoader
+ ClassLoader classLoader = getClass().getClassLoader();
+
+ @SuppressWarnings("unchecked")
+ C proxy =
+ (C)
+ Proxy.newProxyInstance(
+ classLoader,
+ new Class>[] {clazz},
+ invocationHandler);
+
+ return proxy;
+ },
+ actorSystem.dispatcher());
+
+ return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}
private CompletableFuture resolveActorAddress(String address) {
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index ae23abb100d34..2e91569d933e9 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -23,9 +23,12 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.AddressResolution;
+import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+import org.apache.flink.util.function.TriFunction;
import akka.actor.ActorSystem;
import com.typesafe.config.Config;
@@ -38,7 +41,6 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;
-import java.util.function.BiFunction;
import static org.apache.flink.util.NetUtils.isValidClientPort;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -58,10 +60,10 @@ public class AkkaRpcServiceUtils {
static final String SUPERVISOR_NAME = "rpc";
private static final String SIMPLE_AKKA_CONFIG_TEMPLATE =
- "akka {remote {netty.tcp {maximum-frame-size = %s}}}";
+ "akka {remote.classic {netty.tcp {maximum-frame-size = %s}}}";
private static final String MAXIMUM_FRAME_SIZE_PATH =
- "akka.remote.netty.tcp.maximum-frame-size";
+ "akka.remote.classic.netty.tcp.maximum-frame-size";
// ------------------------------------------------------------------------
// RPC instantiation
@@ -326,7 +328,8 @@ public AkkaRpcService createAndStart() throws Exception {
}
public AkkaRpcService createAndStart(
- BiFunction constructor)
+ TriFunction
+ constructor)
throws Exception {
if (actorSystemExecutorConfiguration == null) {
actorSystemExecutorConfiguration =
@@ -336,32 +339,39 @@ public AkkaRpcService createAndStart(
final ActorSystem actorSystem;
- if (externalAddress == null) {
- // create local actor system
- actorSystem =
- AkkaBootstrapTools.startLocalActorSystem(
- configuration,
- actorSystemName,
- logger,
- actorSystemExecutorConfiguration,
- customConfig);
- } else {
- // create remote actor system
- actorSystem =
- AkkaBootstrapTools.startRemoteActorSystem(
- configuration,
- actorSystemName,
- externalAddress,
- externalPortRange,
- bindAddress,
- Optional.ofNullable(bindPort),
- logger,
- actorSystemExecutorConfiguration,
- customConfig);
+ // akka internally caches the context class loader
+ // make sure it uses the plugin class loader
+ try (TemporaryClassLoaderContext ignored =
+ TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
+ if (externalAddress == null) {
+ // create local actor system
+ actorSystem =
+ AkkaBootstrapTools.startLocalActorSystem(
+ configuration,
+ actorSystemName,
+ logger,
+ actorSystemExecutorConfiguration,
+ customConfig);
+ } else {
+ // create remote actor system
+ actorSystem =
+ AkkaBootstrapTools.startRemoteActorSystem(
+ configuration,
+ actorSystemName,
+ externalAddress,
+ externalPortRange,
+ bindAddress,
+ Optional.ofNullable(bindPort),
+ logger,
+ actorSystemExecutorConfiguration,
+ customConfig);
+ }
}
return constructor.apply(
- actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
+ actorSystem,
+ AkkaRpcServiceConfiguration.fromConfiguration(configuration),
+ RpcService.class.getClassLoader());
}
}
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
index 8557896f8734d..134c247f7769f 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
@@ -64,7 +64,8 @@ public FencedAkkaInvocationHandler(
long maximumFramesize,
@Nullable CompletableFuture terminationFuture,
Supplier fencingTokenSupplier,
- boolean captureAskCallStacks) {
+ boolean captureAskCallStacks,
+ ClassLoader flinkClassLoader) {
super(
address,
hostname,
@@ -72,7 +73,8 @@ public FencedAkkaInvocationHandler(
timeout,
maximumFramesize,
terminationFuture,
- captureAskCallStacks);
+ captureAskCallStacks,
+ flinkClassLoader);
this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
}
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index d35e92becc7f5..56220facf25a9 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -45,8 +45,9 @@ public FencedAkkaRpcActor(
T rpcEndpoint,
CompletableFuture terminationFuture,
int version,
- final long maximumFramesize) {
- super(rpcEndpoint, terminationFuture, version, maximumFramesize);
+ final long maximumFramesize,
+ ClassLoader flinkClassLoader) {
+ super(rpcEndpoint, terminationFuture, version, maximumFramesize, flinkClassLoader);
}
@Override
diff --git a/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE b/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE
index e00197115fd65..235f9525093c1 100644
--- a/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE
+++ b/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE
@@ -6,8 +6,35 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- com.typesafe.akka:akka-remote_2.11:2.5.21
+- com.hierynomus:asn-one:0.5.0
+- com.typesafe:config:1.4.0
+- com.typesafe:ssl-config-core_2.12:0.4.2
+- com.typesafe.akka:akka-actor_2.12:2.6.15
+- com.typesafe.akka:akka-remote_2.12:2.6.15
+- com.typesafe.akka:akka-pki_2.12:2.6.15
+- com.typesafe.akka:akka-protobuf-v3_2.12:2.6.15
+- com.typesafe.akka:akka-slf4j_2.12:2.6.15
+- com.typesafe.akka:akka-stream_2.12:2.6.15
- io.netty:netty:3.10.6.Final
+- org.agrona:agrona:1.9.0
+
+This project bundles the following dependencies under the BSD license.
+See bundled license files for details.
+
+- org.clapper:grizzled-slf4j_2.12:1.3.2
+
+The following dependencies all share the same BSD license which you find under licenses/LICENSE.scala.
+
+- org.scala-lang:scala-compiler:2.12.7
+- org.scala-lang:scala-library:2.12.7
+- org.scala-lang:scala-reflect:2.12.7
+- org.scala-lang.modules:scala-java8-compat_2.12:0.8.0
+- org.scala-lang.modules:scala-parser-combinators_2.12:1.0.4
+- org.scala-lang.modules:scala-xml_2.12:1.0.6
+
+This project bundles the following dependencies under the Creative Commons CC0 "No Rights Reserved".
+
+- org.reactivestreams:reactive-streams:1.0.3
This project bundles io.netty:netty:3.10.6.Final from which it inherits the following notices:
@@ -42,4 +69,4 @@ WebSocket and HTTP server:
* LICENSE:
* licenses/LICENSE.webbit (BSD License)
* HOMEPAGE:
- * https://github.com/joewalnes/webbit
+ * https://github.com/joewalnes/webbit
\ No newline at end of file
diff --git a/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/AkkaUtils.scala b/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/AkkaUtils.scala
index 20d63491a95fa..323b5a3e5c592 100644
--- a/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/AkkaUtils.scala
+++ b/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/AkkaUtils.scala
@@ -299,6 +299,7 @@ object AkkaUtils {
| guardian-supervisor-strategy = $supervisorStrategy
|
| warn-about-java-serializer-usage = off
+ | allow-java-serialization = on
|
| default-dispatcher {
| throughput = $akkaThroughput
@@ -470,9 +471,10 @@ object AkkaUtils {
| provider = "akka.remote.RemoteActorRefProvider"
| }
|
- | remote {
- | startup-timeout = $startupTimeout
+ | remote.artery.enabled = false
+ | remote.startup-timeout = $startupTimeout
|
+ | remote.classic {
| # disable the transport failure detector by setting very high values
| transport-failure-detector{
| acceptable-heartbeat-pause = 6000 s
@@ -480,6 +482,8 @@ object AkkaUtils {
| threshold = 300
| }
|
+ | enabled-transports = ["akka.remote.classic.netty.tcp"]
+ |
| netty {
| tcp {
| transport-class = "akka.remote.transport.netty.NettyTransport"
@@ -522,7 +526,7 @@ object AkkaUtils {
val hostnameConfigString =
s"""
|akka {
- | remote {
+ | remote.classic {
| netty {
| tcp {
| hostname = "$effectiveHostname"
@@ -536,13 +540,13 @@ object AkkaUtils {
val sslConfigString = if (akkaEnableSSLConfig) {
s"""
|akka {
- | remote {
+ | remote.classic {
|
- | enabled-transports = ["akka.remote.netty.ssl"]
+ | enabled-transports = ["akka.remote.classic.netty.ssl"]
|
| netty {
|
- | ssl = $${akka.remote.netty.tcp}
+ | ssl = $${akka.remote.classic.netty.tcp}
|
| ssl {
|
diff --git a/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.scala b/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.scala
index fe92eea0f0e4a..f5247e8e59ba9 100644
--- a/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.scala
+++ b/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.scala
@@ -26,7 +26,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.util.FingerprintTrust
class CustomSSLEngineProvider(system : akka.actor.ActorSystem)
extends ConfigSSLEngineProvider(system) {
- private val securityConfig = system.settings.config.getConfig("akka.remote.netty.ssl.security")
+ private val securityConfig = system.settings.config.
+ getConfig("akka.remote.classic.netty.ssl.security")
private val SSLTrustStore = securityConfig.getString("trust-store")
private val SSLTrustStorePassword = securityConfig.getString("trust-store-password")
private val SSLCertFingerprints = securityConfig.getStringList("cert-fingerprints")
diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtilsTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtilsTest.java
new file mode 100644
index 0000000000000..115e185f60cec
--- /dev/null
+++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtilsTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent.akka;
+
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+/** Tests for the {@link ClassLoadingUtils}. */
+public class ClassLoadingUtilsTest extends TestLogger {
+
+ private static final ClassLoader TEST_CLASS_LOADER =
+ new URLClassLoader(new URL[0], ClassLoadingUtilsTest.class.getClassLoader());
+
+ @Test
+ public void testRunnableWithContextClassLoader() throws Exception {
+ final CompletableFuture contextClassLoader = new CompletableFuture<>();
+ Runnable runnable =
+ () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader());
+
+ final Runnable wrappedRunnable =
+ ClassLoadingUtils.withContextClassLoader(runnable, TEST_CLASS_LOADER);
+
+ // the runnable should only be wrapped, not run immediately
+ assertThat(contextClassLoader.isDone(), is(false));
+
+ wrappedRunnable.run();
+ assertThat(contextClassLoader.get(), is(TEST_CLASS_LOADER));
+ }
+
+ @Test
+ public void testExecutorWithContextClassLoader() throws Exception {
+ final Executor wrappedExecutor =
+ ClassLoadingUtils.withContextClassLoader(
+ Executors.newDirectExecutorService(), TEST_CLASS_LOADER);
+
+ final CompletableFuture contextClassLoader = new CompletableFuture<>();
+ Runnable runnable =
+ () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader());
+
+ wrappedExecutor.execute(runnable);
+ assertThat(contextClassLoader.get(), is(TEST_CLASS_LOADER));
+ }
+
+ @Test
+ public void testRunRunnableWithContextClassLoader() throws Exception {
+ final CompletableFuture contextClassLoader = new CompletableFuture<>();
+ ThrowingRunnable runnable =
+ () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader());
+
+ ClassLoadingUtils.runWithContextClassLoader(runnable, TEST_CLASS_LOADER);
+ assertThat(contextClassLoader.get(), is(TEST_CLASS_LOADER));
+ }
+
+ @Test
+ public void testRunSupplierWithContextClassLoader() throws Exception {
+ SupplierWithException runnable =
+ () -> Thread.currentThread().getContextClassLoader();
+
+ final ClassLoader contextClassLoader =
+ ClassLoadingUtils.runWithContextClassLoader(runnable, TEST_CLASS_LOADER);
+ assertThat(contextClassLoader, is(TEST_CLASS_LOADER));
+ }
+
+ @Test
+ public void testGuardCompletionWithContextClassLoader() throws Exception {
+ final CompletableFuture originalFuture = new CompletableFuture<>();
+
+ final CompletableFuture guardedFuture =
+ ClassLoadingUtils.guardCompletionWithContextClassLoader(
+ originalFuture, TEST_CLASS_LOADER);
+
+ final CompletableFuture contextClassLoader =
+ guardedFuture.thenApply(ignored -> Thread.currentThread().getContextClassLoader());
+
+ originalFuture.complete(null);
+ assertThat(contextClassLoader.get(), is(TEST_CLASS_LOADER));
+ }
+}
diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java
new file mode 100644
index 0000000000000..1a585a1f6518f
--- /dev/null
+++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import akka.actor.ActorSystem;
+import akka.actor.Terminated;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader;
+import static org.hamcrest.CoreMatchers.either;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Tests the context class loader handling in various parts of the akka rpc system.
+ *
+ * The tests check cases where we call from the akka rpc system into Flink, in which case the
+ * context class loader must be set to the Flink class loader. This ensures that the Akka class
+ * loader does not get accidentally leaked, e.g., via thread locals or thread pools on the Flink
+ * side.
+ */
+public class ContextClassLoadingSettingTest extends TestLogger {
+
+ private static final Time TIMEOUT = Time.milliseconds(10000L);
+
+ // Many of the contained tests assert that a future is completed with a specific context class
+ // loader by applying a synchronous operation.
+ // If the initial future is completed by the time we apply the synchronous operation the test
+ // thread will execute the operation instead. The tests are thus susceptible to timing issues.
+ // We hence take a probabilistic approach: Assume that this timing is rare, guard these calls in
+ // the test with a temporary class loader context, and assert that the actually used
+ // context class loader is _either_ the one we truly expect or the temporary one.
+ private static final ClassLoader testClassLoader =
+ new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader());
+
+ private ClassLoader pretendFlinkClassLoader;
+ private ActorSystem actorSystem;
+ private AkkaRpcService akkaRpcService;
+
+ @Before
+ public void setup() {
+ pretendFlinkClassLoader =
+ new URLClassLoader(
+ new URL[0], ContextClassLoadingSettingTest.class.getClassLoader());
+ actorSystem = AkkaUtils.createDefaultActorSystem();
+ akkaRpcService =
+ new AkkaRpcService(
+ actorSystem,
+ AkkaRpcServiceConfiguration.defaultConfiguration(),
+ pretendFlinkClassLoader);
+ }
+
+ @After
+ public void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
+ final CompletableFuture rpcTerminationFuture = akkaRpcService.stopService();
+ final CompletableFuture actorSystemTerminationFuture =
+ AkkaFutureUtils.toJava(actorSystem.terminate());
+
+ FutureUtils.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
+ .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ actorSystem = null;
+ akkaRpcService = null;
+ }
+
+ @Test
+ public void testAkkaRpcService_ExecuteRunnableSetsFlinkContextClassLoader()
+ throws ExecutionException, InterruptedException {
+ final CompletableFuture contextClassLoader = new CompletableFuture<>();
+ akkaRpcService.execute(
+ () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()));
+ assertIsFlinkClassLoader(contextClassLoader.get());
+ }
+
+ @Test
+ public void testAkkaRpcService_ExecuteCallableSetsFlinkContextClassLoader()
+ throws ExecutionException, InterruptedException {
+ final CompletableFuture contextClassLoader =
+ akkaRpcService.execute(() -> Thread.currentThread().getContextClassLoader());
+ assertIsFlinkClassLoader(contextClassLoader.get());
+ }
+
+ @Test
+ public void testAkkaRpcService_ExecuteCallableResultCompletedWithFlinkContextClassLoader()
+ throws ExecutionException, InterruptedException {
+
+ final CompletableFuture blocker = new CompletableFuture<>();
+
+ final CompletableFuture contextClassLoader =
+ runWithContextClassLoader(
+ () ->
+ akkaRpcService
+ .execute((Callable) blocker::get)
+ .thenApply(
+ ignored ->
+ Thread.currentThread()
+ .getContextClassLoader()),
+ testClassLoader);
+ blocker.complete(null);
+
+ assertIsFlinkClassLoader(contextClassLoader.get());
+ }
+
+ @Test
+ public void testAkkaRpcService_ScheduleSetsFlinkContextClassLoader()
+ throws ExecutionException, InterruptedException {
+ final CompletableFuture contextClassLoader = new CompletableFuture<>();
+ akkaRpcService.scheduleRunnable(
+ () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()),
+ 5,
+ TimeUnit.MILLISECONDS);
+ assertThat(contextClassLoader.get(), is(pretendFlinkClassLoader));
+ }
+
+ @Test
+ public void testAkkaRpcService_ConnectFutureCompletedWithFlinkContextClassLoader()
+ throws Exception {
+ try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) {
+
+ final ClassLoader contextClassLoader =
+ runWithContextClassLoader(
+ () ->
+ akkaRpcService
+ .connect(
+ testEndpoint.getAddress(),
+ TestEndpointGateway.class)
+ .thenApply(
+ ignored ->
+ Thread.currentThread()
+ .getContextClassLoader())
+ .get(),
+ testClassLoader);
+ assertIsFlinkClassLoader(contextClassLoader);
+ }
+ }
+
+ @Test
+ public void testAkkaRpcService_TerminationFutureCompletedWithFlinkContextClassLoader()
+ throws Exception {
+ final ClassLoader contextClassLoader =
+ runWithContextClassLoader(
+ () ->
+ akkaRpcService
+ .stopService()
+ .thenApply(
+ ignored ->
+ Thread.currentThread()
+ .getContextClassLoader())
+ .get(),
+ testClassLoader);
+
+ assertIsFlinkClassLoader(contextClassLoader);
+ }
+
+ @Test
+ public void testAkkaRpcActor_OnStartCalledWithFlinkContextClassLoader() throws Exception {
+ try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) {
+ testEndpoint.start();
+ assertIsFlinkClassLoader(testEndpoint.onStartClassLoader.get());
+ }
+ }
+
+ @Test
+ public void testAkkaRpcActor_OnStopCalledWithFlinkContextClassLoader() throws Exception {
+ final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
+ testEndpoint.start();
+ testEndpoint.close();
+
+ assertIsFlinkClassLoader(testEndpoint.onStopClassLoader.get());
+ }
+
+ @Test
+ public void testAkkaRpcActor_CallAsyncCalledWithFlinkContextClassLoader() throws Exception {
+ try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) {
+ testEndpoint.start();
+
+ final CompletableFuture contextClassLoader = testEndpoint.doCallAsync();
+ assertIsFlinkClassLoader(contextClassLoader.get());
+ }
+ }
+
+ @Test
+ public void testAkkaRpcActor_RunAsyncCalledWithFlinkContextClassLoader() throws Exception {
+ try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) {
+ testEndpoint.start();
+
+ final CompletableFuture contextClassLoader = testEndpoint.doRunAsync();
+ assertIsFlinkClassLoader(contextClassLoader.get());
+ }
+ }
+
+ @Test
+ public void testAkkaRpcActor_RPCReturningVoidCalledWithFlinkContextClassLoader()
+ throws Exception {
+ try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) {
+ testEndpoint.start();
+
+ final TestEndpointGateway testEndpointGateway =
+ akkaRpcService
+ .connect(testEndpoint.getAddress(), TestEndpointGateway.class)
+ .get();
+ testEndpointGateway.doSomethingWithoutReturningAnything();
+
+ assertIsFlinkClassLoader(testEndpoint.voidOperationClassLoader.get());
+ }
+ }
+
+ @Test
+ public void testAkkaRpcActor_RPCCalledWithFlinkContextClassLoader() throws Exception {
+ try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) {
+ testEndpoint.start();
+
+ final TestEndpointGateway testEndpointGateway =
+ akkaRpcService
+ .connect(testEndpoint.getAddress(), TestEndpointGateway.class)
+ .get();
+ final ClassLoader contextClassLoader =
+ testEndpointGateway.getContextClassLoader().get();
+ assertIsFlinkClassLoader(contextClassLoader);
+ }
+ }
+
+ @Test
+ public void testAkkaRpcInvocationHandler_RPCFutureCompletedWithFlinkContextClassLoader()
+ throws Exception {
+ try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) {
+ testEndpoint.start();
+
+ final TestEndpointGateway testEndpointGateway =
+ akkaRpcService
+ .connect(testEndpoint.getAddress(), TestEndpointGateway.class)
+ .get();
+ final CompletableFuture contextClassLoader =
+ runWithContextClassLoader(
+ () ->
+ testEndpointGateway
+ .doSomethingAsync()
+ .thenApply(
+ ignored ->
+ Thread.currentThread()
+ .getContextClassLoader()),
+ testClassLoader);
+ testEndpoint.completeRPCFuture();
+
+ assertIsFlinkClassLoader(contextClassLoader.get());
+ }
+ }
+
+ @Test
+ public void testSupervisorActor_TerminationFutureCompletedWithFlinkContextClassLoader()
+ throws Exception {
+ final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
+ testEndpoint.start();
+
+ final ClassLoader contextClassLoader =
+ runWithContextClassLoader(
+ () ->
+ testEndpoint
+ .closeAsync()
+ .thenApply(
+ ignored ->
+ Thread.currentThread()
+ .getContextClassLoader())
+ .get(),
+ testClassLoader);
+
+ assertIsFlinkClassLoader(contextClassLoader);
+ }
+
+ private void assertIsFlinkClassLoader(ClassLoader classLoader) {
+ assertThat(classLoader, either(is(pretendFlinkClassLoader)).or(is(testClassLoader)));
+ }
+
+ private interface TestEndpointGateway extends RpcGateway {
+ CompletableFuture getContextClassLoader();
+
+ CompletableFuture doSomethingAsync();
+
+ CompletableFuture doCallAsync();
+
+ CompletableFuture doRunAsync();
+
+ void doSomethingWithoutReturningAnything();
+ }
+
+ private static class TestEndpoint extends RpcEndpoint implements TestEndpointGateway {
+
+ private final CompletableFuture onStartClassLoader = new CompletableFuture<>();
+ private final CompletableFuture onStopClassLoader = new CompletableFuture<>();
+ private final CompletableFuture voidOperationClassLoader =
+ new CompletableFuture<>();
+ private final CompletableFuture rpcResponseFuture = new CompletableFuture<>();
+
+ protected TestEndpoint(RpcService rpcService) {
+ super(rpcService);
+ }
+
+ @Override
+ protected void onStart() throws Exception {
+ onStartClassLoader.complete(Thread.currentThread().getContextClassLoader());
+ super.onStart();
+ }
+
+ @Override
+ protected CompletableFuture onStop() {
+ onStopClassLoader.complete(Thread.currentThread().getContextClassLoader());
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture doSomethingAsync() {
+ return rpcResponseFuture;
+ }
+
+ @Override
+ public CompletableFuture doCallAsync() {
+ return callAsync(
+ () -> Thread.currentThread().getContextClassLoader(),
+ Time.of(10, TimeUnit.SECONDS));
+ }
+
+ @Override
+ public CompletableFuture doRunAsync() {
+ final CompletableFuture contextClassLoader = new CompletableFuture<>();
+ runAsync(
+ () ->
+ contextClassLoader.complete(
+ Thread.currentThread().getContextClassLoader()));
+ return contextClassLoader;
+ }
+
+ @Override
+ public void doSomethingWithoutReturningAnything() {
+ voidOperationClassLoader.complete(Thread.currentThread().getContextClassLoader());
+ }
+
+ public void completeRPCFuture() {
+ rpcResponseFuture.complete(null);
+ }
+
+ @Override
+ public CompletableFuture getContextClassLoader() {
+ return CompletableFuture.completedFuture(
+ Thread.currentThread().getContextClassLoader());
+ }
+ }
+}
diff --git a/flink-rpc/flink-rpc-akka/src/test/scala/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.scala b/flink-rpc/flink-rpc-akka/src/test/scala/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.scala
index 5b65eaf0c7ecc..eb11cbae7541e 100644
--- a/flink-rpc/flink-rpc-akka/src/test/scala/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.scala
+++ b/flink-rpc/flink-rpc-akka/src/test/scala/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.scala
@@ -65,7 +65,7 @@ class AkkaUtilsTest
}
test("getHostFromAkkaURL should return host after at sign") {
- val url = "akka://flink@localhost:1234/user/jobmanager"
+ val url = "akka.tcp://flink@localhost:1234/user/jobmanager"
val expected = new InetSocketAddress("localhost", 1234)
val result = AkkaUtils.getInetSocketAddressFromAkkaURL(url)
@@ -146,14 +146,14 @@ class AkkaUtilsTest
val akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port)
- akkaConfig.getString("akka.remote.netty.tcp.hostname") should
+ akkaConfig.getString("akka.remote.classic.netty.tcp.hostname") should
equal(NetUtils.unresolvedHostToNormalizedString(hostname))
}
test("null hostname should go to localhost") {
val configure = AkkaUtils.getAkkaConfig(new Configuration(), Some((null, 1772)))
- val hostname = configure.getString("akka.remote.netty.tcp.hostname")
+ val hostname = configure.getString("akka.remote.classic.netty.tcp.hostname")
InetAddress.getByName(hostname).isLoopbackAddress should be(true)
}
@@ -196,7 +196,7 @@ class AkkaUtilsTest
val akkaConfig = AkkaUtils.getAkkaConfig(configuration, ipv6AddressString, port)
- akkaConfig.getString("akka.remote.netty.tcp.hostname") should
+ akkaConfig.getString("akka.remote.classic.netty.tcp.hostname") should
equal(NetUtils.unresolvedHostToNormalizedString(ipv6AddressString))
}
@@ -216,7 +216,7 @@ class AkkaUtilsTest
val akkaConfig = AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337)))
- val sslConfig = akkaConfig.getConfig("akka.remote.netty.ssl")
+ val sslConfig = akkaConfig.getConfig("akka.remote.classic.netty.ssl")
sslConfig.getString("ssl-engine-provider") should
equal("org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider")
@@ -233,7 +233,7 @@ class AkkaUtilsTest
val akkaConfig = AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337)))
- val sslConfig = akkaConfig.getConfig("akka.remote.netty.ssl")
+ val sslConfig = akkaConfig.getConfig("akka.remote.classic.netty.ssl")
sslConfig.getString("ssl-engine-provider") should
equal("org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider")
diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
index 5934c607d4d24..9f76e8d68485e 100644
--- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
@@ -88,8 +88,7 @@ static RpcSystem load() {
* @return loaded RpcSystem
*/
static RpcSystem load(Configuration config) {
- final ClassLoader classLoader = RpcSystem.class.getClassLoader();
- return ServiceLoader.load(RpcSystem.class, classLoader).iterator().next();
+ return ServiceLoader.load(RpcSystemLoader.class).iterator().next().loadRpcSystem(config);
}
/** Descriptor for creating a fork-join thread-pool. */
diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystemLoader.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystemLoader.java
new file mode 100644
index 0000000000000..aff47a58325a5
--- /dev/null
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystemLoader.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.configuration.Configuration;
+
+/** A loader for an {@link RpcSystem}. */
+public interface RpcSystemLoader {
+ RpcSystem loadRpcSystem(Configuration config);
+}
diff --git a/flink-rpc/pom.xml b/flink-rpc/pom.xml
index dd405eddf454a..42aba1e38343d 100644
--- a/flink-rpc/pom.xml
+++ b/flink-rpc/pom.xml
@@ -36,5 +36,6 @@ under the License.
flink-rpc-core
flink-rpc-akka
+ flink-rpc-akka-loader
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 6fe9c9102c3bb..a7646808ea081 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -48,7 +48,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
@@ -94,7 +94,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test-jar
test
@@ -114,7 +114,7 @@ under the License.
org.apache.flink
- flink-statebackend-changelog_${scala.binary.version}
+ flink-statebackend-changelog
${project.version}
test
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 7e8d1c52e7786..d29c369f8b57e 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -29,7 +29,7 @@ under the License.
..
- flink-runtime_${scala.binary.version}
+ flink-runtime
Flink : Runtime
jar
@@ -52,7 +52,7 @@ under the License.
org.apache.flink
- flink-rpc-akka_${scala.binary.version}
+ flink-rpc-akka-loader
${project.version}
@@ -175,7 +175,14 @@ under the License.
org.apache.flink
- flink-rpc-akka_${scala.binary.version}
+ flink-rpc-akka
+ ${project.version}
+ test
+
+
+
+ org.apache.flink
+ flink-rpc-akka
${project.version}
test
test-jar
@@ -304,6 +311,33 @@ under the License.
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-rpc-akka-jars
+ process-resources
+
+ copy
+
+
+
+
+ org.apache.flink
+ flink-rpc-akka
+ ${project.version}
+ jar
+ true
+ flink-rpc-akka.jar
+
+
+ ${project.build.directory}/classes
+
+
+
+
diff --git a/flink-state-backends/flink-statebackend-changelog/pom.xml b/flink-state-backends/flink-statebackend-changelog/pom.xml
index 0c4e8a5f20dd5..6b7725ed08370 100644
--- a/flink-state-backends/flink-statebackend-changelog/pom.xml
+++ b/flink-state-backends/flink-statebackend-changelog/pom.xml
@@ -31,7 +31,7 @@ under the License.
..
- flink-statebackend-changelog_${scala.binary.version}
+ flink-statebackend-changelog
Flink : State backends : Changelog
jar
@@ -47,7 +47,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
provided
@@ -66,7 +66,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test
test-jar
diff --git a/flink-state-backends/flink-statebackend-heap-spillable/pom.xml b/flink-state-backends/flink-statebackend-heap-spillable/pom.xml
index 8be1989ef8d6c..c07b966bf6320 100644
--- a/flink-state-backends/flink-statebackend-heap-spillable/pom.xml
+++ b/flink-state-backends/flink-statebackend-heap-spillable/pom.xml
@@ -31,7 +31,7 @@ under the License.
..
- flink-statebackend-heap-spillable_${scala.binary.version}
+ flink-statebackend-heap-spillable
Flink : State backends : Heap spillable
jar
@@ -41,7 +41,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
provided
@@ -55,7 +55,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test
test-jar
diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
index fc923a175cffc..9e03f4e628c1e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml
+++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
@@ -76,7 +76,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test-jar
test
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 81643cd34e025..9e72a59d934c6 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -52,7 +52,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
@@ -97,7 +97,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test
test-jar
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index e8bc96f16c91c..d8a3e09898af7 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -85,7 +85,7 @@ under the License.
org.apache.flink
- flink-statebackend-changelog_${scala.binary.version}
+ flink-statebackend-changelog
${project.version}
test
@@ -116,7 +116,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test
test-jar
diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml
index 8c13024387e6f..88a0ca7518303 100644
--- a/flink-table/flink-sql-client/pom.xml
+++ b/flink-table/flink-sql-client/pom.xml
@@ -128,7 +128,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test-jar
test
@@ -136,7 +136,7 @@ under the License.
org.apache.flink
- flink-statebackend-changelog_${scala.binary.version}
+ flink-statebackend-changelog
${project.version}
test
diff --git a/flink-table/flink-table-api-java-bridge/pom.xml b/flink-table/flink-table-api-java-bridge/pom.xml
index 988b8b81e8d14..47f1264fbacf7 100644
--- a/flink-table/flink-table-api-java-bridge/pom.xml
+++ b/flink-table/flink-table-api-java-bridge/pom.xml
@@ -78,7 +78,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test-jar
test
diff --git a/flink-table/flink-table-planner/pom.xml b/flink-table/flink-table-planner/pom.xml
index 767641e0617d7..f636981c3cc85 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -281,7 +281,7 @@ under the License.
org.apache.flink
- flink-statebackend-changelog_${scala.binary.version}
+ flink-statebackend-changelog
${project.version}
test
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml
index 9f71e67ef79e1..94897529ae6f6 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -44,14 +44,14 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
compile
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test-jar
compile
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 1d771bd73dd39..a0219f1380d8f 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -87,14 +87,14 @@ under the License.
org.apache.flink
- flink-optimizer_${scala.binary.version}
+ flink-optimizer
${project.version}
test
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test
@@ -187,7 +187,7 @@ under the License.
org.apache.flink
- flink-optimizer_${scala.binary.version}
+ flink-optimizer
${project.version}
test-jar
test
@@ -195,7 +195,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test-jar
test
@@ -249,7 +249,7 @@ under the License.
org.apache.flink
- flink-statebackend-changelog_${scala.binary.version}
+ flink-statebackend-changelog
${project.version}
test
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 6c7ecc2c86d7e..bb4101644a85e 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -63,7 +63,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test
test-jar
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 22860a73f30f7..5a14a9d63f78f 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -43,7 +43,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
@@ -88,7 +88,7 @@ under the License.
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime
${project.version}
test-jar
test
diff --git a/pom.xml b/pom.xml
index c2dbb32ae0e70..8b022329fd505 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,6 @@ under the License.
true
13.0
18.0
- 2.5.21
1.8
1.7.15
2.14.1
@@ -507,13 +506,6 @@ under the License.
2.1
-
-
- com.typesafe
- config
- 1.3.0
-
-
commons-logging
@@ -662,56 +654,6 @@ under the License.
${scala.version}
-
- org.clapper
- grizzled-slf4j_${scala.binary.version}
- 1.3.2
-
-
-
- com.typesafe.akka
- akka-actor_${scala.binary.version}
- ${akka.version}
-
-
-
- com.typesafe.akka
- akka-remote_${scala.binary.version}
- ${akka.version}
-
-
- io.aeron
- aeron-driver
-
-
- io.aeron
- aeron-client
-
-
-
-
-
-
- com.typesafe.akka
- akka-stream_${scala.binary.version}
- ${akka.version}
-
-
-
-
- com.typesafe.akka
- akka-protobuf_${scala.binary.version}
- ${akka.version}
-
-
-
- com.typesafe.akka
- akka-slf4j_${scala.binary.version}
- ${akka.version}
-
-
org.scala-lang.modules
scala-parser-combinators_${scala.binary.version}
diff --git a/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java b/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java
index d21d26681a542..239bdd5bff0c4 100644
--- a/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java
+++ b/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java
@@ -52,6 +52,8 @@ public class ScalaSuffixChecker {
// [INFO] +- org.scala-lang:scala-reflect:jar:2.11.12:test
private static final Pattern scalaSuffixPattern = Pattern.compile("_2.1[0-9]");
+ private static final String AKKA_RPC_MODULE_NAME = "flink-rpc-akka";
+
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage: ScalaSuffixChecker ");
@@ -93,6 +95,11 @@ private static ParseResult parseMavenOutput(final Path path) throws IOException
final Matcher matcher = moduleNamePattern.matcher(line);
if (matcher.matches()) {
final String moduleName = stripScalaSuffix(matcher.group(1));
+ // we ignored flink-rpc-akka because it is loaded through a separate class
+ // loader
+ if (moduleName.equals(AKKA_RPC_MODULE_NAME)) {
+ continue;
+ }
LOG.trace("Parsing module '{}'.", moduleName);
// skip: [INFO] org.apache.flink:flink-annotations:jar:1.14-SNAPSHOT
@@ -103,10 +110,14 @@ private static ParseResult parseMavenOutput(final Path path) throws IOException
while (blockPattern.matcher(line).matches()) {
final boolean dependsOnScala = dependsOnScala(line);
final boolean isTestDependency = line.endsWith(":test");
+ // we ignored flink-rpc-akka because it is loaded through a separate class
+ // loader
+ final boolean isFlinkAkkaRpc = line.contains(AKKA_RPC_MODULE_NAME);
LOG.trace("\tline:{}", line);
LOG.trace("\t\tdepends-on-scala:{}", dependsOnScala);
LOG.trace("\t\tis-test-dependency:{}", isTestDependency);
- if (dependsOnScala && !isTestDependency) {
+ LOG.trace("\t\tis-flink-rpc-akka:{}", isFlinkAkkaRpc);
+ if (dependsOnScala && !isTestDependency && !isFlinkAkkaRpc) {
LOG.trace("\t\tOutbreak detected at {}!", moduleName);
infected = true;
}