From b34ef1e44b5d9332c5bd07f8c2c46bc93243debd Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Tue, 29 Jun 2021 22:03:28 +0200 Subject: [PATCH] [FLINK-18783] Load Akka with separate classloader --- .../standalone/kubernetes.md | 4 +- .../standalone/kubernetes.md | 4 +- flink-clients/pom.xml | 6 +- flink-connectors/flink-connector-base/pom.xml | 2 +- .../flink-connector-cassandra/pom.xml | 2 +- .../pom.xml | 2 +- .../flink-connector-elasticsearch5/pom.xml | 2 +- .../flink-connector-elasticsearch6/pom.xml | 2 +- .../flink-connector-elasticsearch7/pom.xml | 2 +- .../flink-connector-files/pom.xml | 2 +- .../flink-connector-gcp-pubsub/pom.xml | 2 +- .../flink-connector-hbase-1.4/pom.xml | 2 +- flink-connectors/flink-connector-hive/pom.xml | 2 +- flink-connectors/flink-connector-jdbc/pom.xml | 2 +- .../flink-connector-kafka/pom.xml | 4 +- .../flink-connector-kinesis/pom.xml | 4 +- .../flink-connector-rabbitmq/pom.xml | 4 +- flink-container/pom.xml | 2 +- .../flink/configuration/CoreOptions.java | 3 +- .../classloading/SubmoduleClassLoader.java | 45 +++ .../flink/util/concurrent/FutureUtils.java | 26 +- flink-dist/pom.xml | 11 +- flink-dist/src/main/assemblies/opt.xml | 4 +- .../main/flink-bin/conf/log4j-cli.properties | 2 +- .../flink-bin/conf/log4j-console.properties | 2 +- .../flink-bin/conf/log4j-session.properties | 2 +- .../src/main/flink-bin/conf/log4j.properties | 2 +- .../main/flink-bin/conf/logback-console.xml | 2 +- .../src/main/flink-bin/conf/logback.xml | 2 +- flink-dist/src/main/resources/META-INF/NOTICE | 12 - flink-docs/pom.xml | 4 +- .../pom.xml | 2 +- .../flink-end-to-end-tests-common/pom.xml | 2 +- .../flink-metrics-availability-test/pom.xml | 2 +- flink-end-to-end-tests/test-scripts/common.sh | 4 +- .../flink-examples-streaming/pom.xml | 2 +- flink-examples/flink-examples-table/pom.xml | 2 +- flink-formats/flink-avro/pom.xml | 2 +- flink-formats/flink-compress/pom.xml | 2 +- flink-formats/flink-csv/pom.xml | 2 +- flink-formats/flink-hadoop-bulk/pom.xml | 2 +- flink-formats/flink-json/pom.xml | 2 +- flink-formats/flink-orc/pom.xml | 2 +- flink-formats/flink-parquet/pom.xml | 2 +- flink-formats/flink-sequence-file/pom.xml | 2 +- flink-fs-tests/pom.xml | 4 +- flink-kubernetes/pom.xml | 6 +- flink-libraries/flink-cep/pom.xml | 4 +- flink-libraries/flink-gelly/pom.xml | 2 +- .../flink-state-processing-api/pom.xml | 4 +- flink-metrics/flink-metrics-influxdb/pom.xml | 2 +- flink-metrics/flink-metrics-jmx/pom.xml | 2 +- .../flink-metrics-prometheus/pom.xml | 4 +- flink-optimizer/pom.xml | 6 +- flink-python/pom.xml | 2 +- .../flink-queryable-state-runtime/pom.xml | 8 +- flink-rpc/flink-rpc-akka-loader/pom.xml | 127 ++++++ .../runtime/rpc/akka/AkkaRpcSystemLoader.java | 78 ++++ .../rpc/akka/CleanupOnCloseRpcSystem.java | 100 +++++ ...g.apache.flink.runtime.rpc.RpcSystemLoader | 16 + flink-rpc/flink-rpc-akka/pom.xml | 176 ++++---- .../ActorSystemScheduledExecutorAdapter.java | 22 +- .../concurrent/akka/ClassLoadingUtils.java | 119 ++++++ .../rpc/akka/AkkaInvocationHandler.java | 12 +- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 51 ++- .../runtime/rpc/akka/AkkaRpcService.java | 100 +++-- .../runtime/rpc/akka/AkkaRpcServiceUtils.java | 64 +-- .../rpc/akka/FencedAkkaInvocationHandler.java | 6 +- .../runtime/rpc/akka/FencedAkkaRpcActor.java | 5 +- .../src/main/resources/META-INF/NOTICE | 31 +- .../flink/runtime/rpc/akka/AkkaUtils.scala | 16 +- .../rpc/akka/CustomSSLEngineProvider.scala | 3 +- .../akka/ClassLoadingUtilsTest.java | 105 +++++ .../akka/ContextClassLoadingSettingTest.java | 381 ++++++++++++++++++ .../runtime/rpc/akka/AkkaUtilsTest.scala | 12 +- .../apache/flink/runtime/rpc/RpcSystem.java | 3 +- .../flink/runtime/rpc/RpcSystemLoader.java | 25 ++ flink-rpc/pom.xml | 1 + flink-runtime-web/pom.xml | 6 +- flink-runtime/pom.xml | 40 +- .../flink-statebackend-changelog/pom.xml | 6 +- .../flink-statebackend-heap-spillable/pom.xml | 6 +- .../flink-statebackend-rocksdb/pom.xml | 2 +- flink-streaming-java/pom.xml | 4 +- flink-streaming-scala/pom.xml | 4 +- flink-table/flink-sql-client/pom.xml | 4 +- .../flink-table-api-java-bridge/pom.xml | 2 +- flink-table/flink-table-planner/pom.xml | 2 +- .../flink-test-utils/pom.xml | 4 +- flink-tests/pom.xml | 10 +- flink-yarn-tests/pom.xml | 2 +- flink-yarn/pom.xml | 4 +- pom.xml | 58 --- .../ci/suffixcheck/ScalaSuffixChecker.java | 13 +- 94 files changed, 1491 insertions(+), 370 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/core/classloading/SubmoduleClassLoader.java create mode 100644 flink-rpc/flink-rpc-akka-loader/pom.xml create mode 100644 flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java create mode 100644 flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java create mode 100644 flink-rpc/flink-rpc-akka-loader/src/main/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader create mode 100644 flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtils.java create mode 100644 flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtilsTest.java create mode 100644 flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java create mode 100644 flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystemLoader.java diff --git a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md index b9f51ce71a23a..c68840de0ee44 100644 --- a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md +++ b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md @@ -310,7 +310,7 @@ data: appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler - logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline + logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF ``` @@ -380,7 +380,7 @@ data: appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler - logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline + logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF ``` diff --git a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md index 98c5d9bba03d8..c81b7fa684e42 100644 --- a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md +++ b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md @@ -310,7 +310,7 @@ data: appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler - logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline + logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF ``` @@ -380,7 +380,7 @@ data: appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler - logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline + logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF ``` diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml index 0aa4bb45e1277..7f3cb3b07ff52 100644 --- a/flink-clients/pom.xml +++ b/flink-clients/pom.xml @@ -47,13 +47,13 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} org.apache.flink - flink-optimizer_${scala.binary.version} + flink-optimizer ${project.version} @@ -77,7 +77,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test test-jar diff --git a/flink-connectors/flink-connector-base/pom.xml b/flink-connectors/flink-connector-base/pom.xml index ae2f573af8cdb..8dd85317b1226 100644 --- a/flink-connectors/flink-connector-base/pom.xml +++ b/flink-connectors/flink-connector-base/pom.xml @@ -68,7 +68,7 @@ org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml index cda2891cd1974..117df6af3a4b7 100644 --- a/flink-connectors/flink-connector-cassandra/pom.xml +++ b/flink-connectors/flink-connector-cassandra/pom.xml @@ -183,7 +183,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test test-jar diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml index ea25c68ea9fe1..07817accff0ee 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml @@ -94,7 +94,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-connectors/flink-connector-elasticsearch5/pom.xml b/flink-connectors/flink-connector-elasticsearch5/pom.xml index 66d7bc48b1d60..536692c8cb592 100644 --- a/flink-connectors/flink-connector-elasticsearch5/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch5/pom.xml @@ -135,7 +135,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml index b8c89dde033e1..6491887fe69ef 100644 --- a/flink-connectors/flink-connector-elasticsearch6/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml @@ -112,7 +112,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-connectors/flink-connector-elasticsearch7/pom.xml b/flink-connectors/flink-connector-elasticsearch7/pom.xml index ecac13d12f7ac..0141d697e22c9 100644 --- a/flink-connectors/flink-connector-elasticsearch7/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch7/pom.xml @@ -110,7 +110,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-connectors/flink-connector-files/pom.xml b/flink-connectors/flink-connector-files/pom.xml index e6f4c2dfd1b10..aeecf0783c820 100644 --- a/flink-connectors/flink-connector-files/pom.xml +++ b/flink-connectors/flink-connector-files/pom.xml @@ -107,7 +107,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-connectors/flink-connector-gcp-pubsub/pom.xml b/flink-connectors/flink-connector-gcp-pubsub/pom.xml index ddb0545db80da..2676e42cefecf 100644 --- a/flink-connectors/flink-connector-gcp-pubsub/pom.xml +++ b/flink-connectors/flink-connector-gcp-pubsub/pom.xml @@ -88,7 +88,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-connectors/flink-connector-hbase-1.4/pom.xml b/flink-connectors/flink-connector-hbase-1.4/pom.xml index 5228c886c6360..073c1789736a4 100644 --- a/flink-connectors/flink-connector-hbase-1.4/pom.xml +++ b/flink-connectors/flink-connector-hbase-1.4/pom.xml @@ -348,7 +348,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 99b3b6aa87f35..6ca4fa5f66e52 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -867,7 +867,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml index 86b7758db8a11..dbe78e48a4cdd 100644 --- a/flink-connectors/flink-connector-jdbc/pom.xml +++ b/flink-connectors/flink-connector-jdbc/pom.xml @@ -72,7 +72,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml index 6bb8265805d84..9a1fdefe02daa 100644 --- a/flink-connectors/flink-connector-kafka/pom.xml +++ b/flink-connectors/flink-connector-kafka/pom.xml @@ -99,7 +99,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test @@ -149,7 +149,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml index 6fce25ae08a3c..e993068e6a46e 100644 --- a/flink-connectors/flink-connector-kinesis/pom.xml +++ b/flink-connectors/flink-connector-kinesis/pom.xml @@ -180,14 +180,14 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-connectors/flink-connector-rabbitmq/pom.xml b/flink-connectors/flink-connector-rabbitmq/pom.xml index ba65de4f047c5..12b6cd2c1002e 100644 --- a/flink-connectors/flink-connector-rabbitmq/pom.xml +++ b/flink-connectors/flink-connector-rabbitmq/pom.xml @@ -65,14 +65,14 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-container/pom.xml b/flink-container/pom.xml index eb2630c4e96e5..7e5a054af2e84 100644 --- a/flink-container/pom.xml +++ b/flink-container/pom.xml @@ -39,7 +39,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} provided diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index db21d212b492f..b81b409ea24ab 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -183,7 +183,8 @@ public static String[] getPluginParentFirstLoaderPatterns(Configuration config) return parseParentFirstLoaderPatterns(base, append); } - private static String[] parseParentFirstLoaderPatterns(String base, String append) { + @Internal + public static String[] parseParentFirstLoaderPatterns(String base, String append) { Splitter splitter = Splitter.on(';').omitEmptyStrings(); return Iterables.toArray( Iterables.concat(splitter.split(base), splitter.split(append)), String.class); diff --git a/flink-core/src/main/java/org/apache/flink/core/classloading/SubmoduleClassLoader.java b/flink-core/src/main/java/org/apache/flink/core/classloading/SubmoduleClassLoader.java new file mode 100644 index 0000000000000..a3bc222abc597 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/classloading/SubmoduleClassLoader.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.classloading; + +import org.apache.flink.configuration.CoreOptions; + +import java.net.URL; + +/** + * Loads all classes from the submodule jar, except for explicitly white-listed packages. + * + *

To ensure that classes from the submodule are always loaded through the submodule classloader + * (and thus from the submodule jar), even if the classes are also on the classpath (e.g., during + * tests), all classes from the "org.apache.flink" package are loaded child-first. + * + *

Classes related to logging (e.g., log4j) are loaded parent-first. + * + *

All other classes can only be loaded if they are either available in the submodule jar or the + * bootstrap/app classloader (i.e., provided by the JDK). + */ +public class SubmoduleClassLoader extends ComponentClassLoader { + public SubmoduleClassLoader(URL[] classpath, ClassLoader parentClassLoader) { + super( + classpath, + parentClassLoader, + CoreOptions.parseParentFirstLoaderPatterns( + CoreOptions.PARENT_FIRST_LOGGING_PATTERNS, ""), + new String[] {"org.apache.flink"}); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java index bdfee9d41e877..f9a094a1c478c 100644 --- a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java @@ -1369,13 +1369,25 @@ public static void throwIfCompletedExceptionally(CompletableFuture future) th } private static BiConsumer forwardTo(CompletableFuture target) { - return (value, throwable) -> { - if (throwable != null) { - target.completeExceptionally(throwable); - } else { - target.complete(value); - } - }; + return (value, throwable) -> doForward(value, throwable, target); + } + + /** + * Completes the given future with either the given value or throwable, depending on which + * parameter is not null. + * + * @param value value with which the future should be completed + * @param throwable throwable with which the future should be completed exceptionally + * @param target future to complete + * @param completed future + */ + public static void doForward( + @Nullable T value, @Nullable Throwable throwable, CompletableFuture target) { + if (throwable != null) { + target.completeExceptionally(throwable); + } else { + target.complete(value); + } } /** diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 0355f2731b35a..f79bbcd3dcb8e 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -61,7 +61,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} @@ -73,7 +73,7 @@ under the License. org.apache.flink - flink-optimizer_${scala.binary.version} + flink-optimizer ${project.version} @@ -376,7 +376,7 @@ under the License. org.apache.flink - flink-queryable-state-runtime_${scala.binary.version} + flink-queryable-state-runtime ${project.version} provided @@ -688,11 +688,6 @@ under the License. org.apache.flink:flink-shaded-zookeeper-3 - - - reference.conf - - diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml index 23c197b9d74a3..e4ad9e13ef80d 100644 --- a/flink-dist/src/main/assemblies/opt.xml +++ b/flink-dist/src/main/assemblies/opt.xml @@ -105,9 +105,9 @@ - ../flink-queryable-state/flink-queryable-state-runtime/target/flink-queryable-state-runtime_${scala.binary.version}-${project.version}.jar + ../flink-queryable-state/flink-queryable-state-runtime/target/flink-queryable-state-runtime-${project.version}.jar opt/ - flink-queryable-state-runtime_${scala.binary.version}-${project.version}.jar + flink-queryable-state-runtime-${project.version}.jar 0644 diff --git a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties index d372da150bf24..cfb52ad42e662 100644 --- a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties +++ b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties @@ -63,5 +63,5 @@ logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader logger.hadoopnative.level = OFF # Suppress the irrelevant (wrong) warnings from the Netty channel handler -logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF diff --git a/flink-dist/src/main/flink-bin/conf/log4j-console.properties b/flink-dist/src/main/flink-bin/conf/log4j-console.properties index 81868ab7d2a03..51353e4d4b2e8 100644 --- a/flink-dist/src/main/flink-bin/conf/log4j-console.properties +++ b/flink-dist/src/main/flink-bin/conf/log4j-console.properties @@ -64,5 +64,5 @@ appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10} # Suppress the irrelevant (wrong) warnings from the Netty channel handler -logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF diff --git a/flink-dist/src/main/flink-bin/conf/log4j-session.properties b/flink-dist/src/main/flink-bin/conf/log4j-session.properties index 2fd6f50cfc0e9..136a4a6a29833 100644 --- a/flink-dist/src/main/flink-bin/conf/log4j-session.properties +++ b/flink-dist/src/main/flink-bin/conf/log4j-session.properties @@ -28,7 +28,7 @@ appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Suppress the irrelevant (wrong) warnings from the Netty channel handler -logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = WARN diff --git a/flink-dist/src/main/flink-bin/conf/log4j.properties b/flink-dist/src/main/flink-bin/conf/log4j.properties index 17d0a53c70018..477b00b639242 100644 --- a/flink-dist/src/main/flink-bin/conf/log4j.properties +++ b/flink-dist/src/main/flink-bin/conf/log4j.properties @@ -57,5 +57,5 @@ appender.main.strategy.type = DefaultRolloverStrategy appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10} # Suppress the irrelevant (wrong) warnings from the Netty channel handler -logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF diff --git a/flink-dist/src/main/flink-bin/conf/logback-console.xml b/flink-dist/src/main/flink-bin/conf/logback-console.xml index 8cea03c3d0b72..d6cc276174f2d 100644 --- a/flink-dist/src/main/flink-bin/conf/logback-console.xml +++ b/flink-dist/src/main/flink-bin/conf/logback-console.xml @@ -60,5 +60,5 @@ - + diff --git a/flink-dist/src/main/flink-bin/conf/logback.xml b/flink-dist/src/main/flink-bin/conf/logback.xml index ae0bfe7a33adb..f3c433105a7f9 100644 --- a/flink-dist/src/main/flink-bin/conf/logback.xml +++ b/flink-dist/src/main/flink-bin/conf/logback.xml @@ -52,7 +52,7 @@ - + diff --git a/flink-dist/src/main/resources/META-INF/NOTICE b/flink-dist/src/main/resources/META-INF/NOTICE index 30d9f9c84fa1d..11bafd083633d 100644 --- a/flink-dist/src/main/resources/META-INF/NOTICE +++ b/flink-dist/src/main/resources/META-INF/NOTICE @@ -10,12 +10,6 @@ This project bundles the following dependencies under the Apache Software Licens - com.google.code.findbugs:jsr305:1.3.9 - com.twitter:chill-java:0.7.6 - com.twitter:chill_2.11:0.7.6 -- com.typesafe:config:1.3.0 -- com.typesafe:ssl-config-core_2.11:0.3.7 -- com.typesafe.akka:akka-actor_2.11:2.5.21 -- com.typesafe.akka:akka-protobuf_2.11:2.5.21 -- com.typesafe.akka:akka-slf4j_2.11:2.5.21 -- com.typesafe.akka:akka-stream_2.11:2.5.21 - commons-cli:commons-cli:1.3.1 - commons-collections:commons-collections:3.2.2 - commons-io:commons-io:2.8.0 @@ -32,7 +26,6 @@ See bundled license files for details. - com.esotericsoftware.kryo:kryo:2.24.0 - com.esotericsoftware.minlog:minlog:1.2 -- org.clapper:grizzled-slf4j_2.11:1.3.2 The following dependencies all share the same BSD license which you find under licenses/LICENSE.scala. @@ -46,7 +39,6 @@ The following dependencies all share the same BSD license which you find under l This project bundles the following dependencies under the MIT/X11 license. See bundled license files for details. -- com.github.scopt:scopt_2.11:3.5.0 - org.slf4j:slf4j-api:1.7.15 This project bundles the following dependencies under the CDDL 1.1 license. @@ -54,7 +46,3 @@ See bundled license files for details. - javax.activation:javax.activation-api:1.2.0 - javax.xml.bind:jaxb-api:2.3.1 - -This project bundles the following dependencies under the Creative Commons CC0 "No Rights Reserved". - -- org.reactivestreams:reactive-streams:1.0.2 diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml index a9c8f9bc62aa4..68364e0840936 100644 --- a/flink-docs/pom.xml +++ b/flink-docs/pom.xml @@ -55,12 +55,12 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar diff --git a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml index 87ba854610c4d..2df34753c5ff0 100644 --- a/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml +++ b/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml @@ -77,7 +77,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml index 9694de7d88f78..a2b464aefe3cd 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml @@ -57,7 +57,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} compile diff --git a/flink-end-to-end-tests/flink-metrics-availability-test/pom.xml b/flink-end-to-end-tests/flink-metrics-availability-test/pom.xml index 2a997f7fbfaa0..963c94c80f9d2 100644 --- a/flink-end-to-end-tests/flink-metrics-availability-test/pom.xml +++ b/flink-end-to-end-tests/flink-metrics-availability-test/pom.xml @@ -40,7 +40,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index d8d6ff602515f..2a0c2273477cd 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -354,7 +354,7 @@ function check_logs_for_errors { | grep -v "AskTimeoutException" \ | grep -v "Error while loading kafka-version.properties" \ | grep -v "WARN akka.remote.transport.netty.NettyTransport" \ - | grep -v "WARN org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \ + | grep -v "WARN org.jboss.netty.channel.DefaultChannelPipeline" \ | grep -v "jvm-exit-on-fatal-error" \ | grep -v 'INFO.*AWSErrorCode' \ | grep -v "RejectedExecutionException" \ @@ -389,7 +389,7 @@ function check_logs_for_exceptions { | grep -v "Cannot connect to ResourceManager right now" \ | grep -v "AskTimeoutException" \ | grep -v "WARN akka.remote.transport.netty.NettyTransport" \ - | grep -v "WARN org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \ + | grep -v "WARN org.jboss.netty.channel.DefaultChannelPipeline" \ | grep -v 'INFO.*AWSErrorCode' \ | grep -v "RejectedExecutionException" \ | grep -v "CancellationException" \ diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index e8729197201d2..5063ed33b5215 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -95,7 +95,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml index b15595293150f..fd11baadb01b9 100644 --- a/flink-examples/flink-examples-table/pom.xml +++ b/flink-examples/flink-examples-table/pom.xml @@ -80,7 +80,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml index f5fb4660806fe..31657c9f22909 100644 --- a/flink-formats/flink-avro/pom.xml +++ b/flink-formats/flink-avro/pom.xml @@ -141,7 +141,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-formats/flink-compress/pom.xml b/flink-formats/flink-compress/pom.xml index 78301ed17ed74..871990f97f57a 100644 --- a/flink-formats/flink-compress/pom.xml +++ b/flink-formats/flink-compress/pom.xml @@ -66,7 +66,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-formats/flink-csv/pom.xml b/flink-formats/flink-csv/pom.xml index 32877619dad13..32081ce301689 100644 --- a/flink-formats/flink-csv/pom.xml +++ b/flink-formats/flink-csv/pom.xml @@ -77,7 +77,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-formats/flink-hadoop-bulk/pom.xml b/flink-formats/flink-hadoop-bulk/pom.xml index bd820d1da328d..5433f9192eaa3 100644 --- a/flink-formats/flink-hadoop-bulk/pom.xml +++ b/flink-formats/flink-hadoop-bulk/pom.xml @@ -83,7 +83,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml index a30c199d5c6f9..9c230c500cd1b 100644 --- a/flink-formats/flink-json/pom.xml +++ b/flink-formats/flink-json/pom.xml @@ -66,7 +66,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml index 638cd28177581..b7f296c78c66b 100644 --- a/flink-formats/flink-orc/pom.xml +++ b/flink-formats/flink-orc/pom.xml @@ -118,7 +118,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-formats/flink-parquet/pom.xml b/flink-formats/flink-parquet/pom.xml index 66887b3bc3a26..801ba21822636 100644 --- a/flink-formats/flink-parquet/pom.xml +++ b/flink-formats/flink-parquet/pom.xml @@ -174,7 +174,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-formats/flink-sequence-file/pom.xml b/flink-formats/flink-sequence-file/pom.xml index 9c23fe5e5cf85..dc23ad63d77a5 100644 --- a/flink-formats/flink-sequence-file/pom.xml +++ b/flink-formats/flink-sequence-file/pom.xml @@ -80,7 +80,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-fs-tests/pom.xml b/flink-fs-tests/pom.xml index 939eaebc0ebde..ce8f36f7ad05e 100644 --- a/flink-fs-tests/pom.xml +++ b/flink-fs-tests/pom.xml @@ -58,7 +58,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test @@ -91,7 +91,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test test-jar diff --git a/flink-kubernetes/pom.xml b/flink-kubernetes/pom.xml index 67cbc646c1f1b..95807a9a8ef11 100644 --- a/flink-kubernetes/pom.xml +++ b/flink-kubernetes/pom.xml @@ -48,14 +48,14 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} provided org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test @@ -82,7 +82,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml index df3150cdd2636..25a590f991687 100644 --- a/flink-libraries/flink-cep/pom.xml +++ b/flink-libraries/flink-cep/pom.xml @@ -92,7 +92,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test @@ -107,7 +107,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-libraries/flink-gelly/pom.xml b/flink-libraries/flink-gelly/pom.xml index 15fb758a00c28..50338d6e0e850 100644 --- a/flink-libraries/flink-gelly/pom.xml +++ b/flink-libraries/flink-gelly/pom.xml @@ -77,7 +77,7 @@ under the License. org.apache.flink - flink-optimizer_${scala.binary.version} + flink-optimizer ${project.version} test-jar test diff --git a/flink-libraries/flink-state-processing-api/pom.xml b/flink-libraries/flink-state-processing-api/pom.xml index f6dfe16412e1d..a07585ef4c412 100644 --- a/flink-libraries/flink-state-processing-api/pom.xml +++ b/flink-libraries/flink-state-processing-api/pom.xml @@ -63,7 +63,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test @@ -85,7 +85,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-metrics/flink-metrics-influxdb/pom.xml b/flink-metrics/flink-metrics-influxdb/pom.xml index d4033db8d70c2..d8955ef4be768 100644 --- a/flink-metrics/flink-metrics-influxdb/pom.xml +++ b/flink-metrics/flink-metrics-influxdb/pom.xml @@ -65,7 +65,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test test-jar diff --git a/flink-metrics/flink-metrics-jmx/pom.xml b/flink-metrics/flink-metrics-jmx/pom.xml index 8dd4f4b6d9938..698b51cb94a36 100644 --- a/flink-metrics/flink-metrics-jmx/pom.xml +++ b/flink-metrics/flink-metrics-jmx/pom.xml @@ -77,7 +77,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-metrics/flink-metrics-prometheus/pom.xml b/flink-metrics/flink-metrics-prometheus/pom.xml index 8df0f8c00b9fc..53fa8fc51909f 100644 --- a/flink-metrics/flink-metrics-prometheus/pom.xml +++ b/flink-metrics/flink-metrics-prometheus/pom.xml @@ -84,14 +84,14 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test test-jar diff --git a/flink-optimizer/pom.xml b/flink-optimizer/pom.xml index fabee4341f557..d2697b6e332c8 100644 --- a/flink-optimizer/pom.xml +++ b/flink-optimizer/pom.xml @@ -29,7 +29,7 @@ under the License. .. - flink-optimizer_${scala.binary.version} + flink-optimizer Flink : Optimizer jar @@ -46,7 +46,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} @@ -75,7 +75,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-python/pom.xml b/flink-python/pom.xml index 7067bbaba4833..391833ee93bb7 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -174,7 +174,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-queryable-state/flink-queryable-state-runtime/pom.xml b/flink-queryable-state/flink-queryable-state-runtime/pom.xml index cd03a4506e256..15bc0bc740864 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/pom.xml +++ b/flink-queryable-state/flink-queryable-state-runtime/pom.xml @@ -30,7 +30,7 @@ under the License. .. - flink-queryable-state-runtime_${scala.binary.version} + flink-queryable-state-runtime Flink : Queryable state : Runtime jar @@ -47,7 +47,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} provided @@ -88,7 +88,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test @@ -114,7 +114,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-rpc/flink-rpc-akka-loader/pom.xml b/flink-rpc/flink-rpc-akka-loader/pom.xml new file mode 100644 index 0000000000000..304826d845698 --- /dev/null +++ b/flink-rpc/flink-rpc-akka-loader/pom.xml @@ -0,0 +1,127 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-rpc + 1.14-SNAPSHOT + .. + + + flink-rpc-akka-loader + Flink : RPC : Akka-Loader + jar + This module contains the mechanism for loading flink-rpc-akka through a separate classloader. + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-rpc-core + ${project.version} + + + + + org.apache.flink + flink-rpc-akka + ${project.version} + + runtime + + true + + + + * + * + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-rpc-akka-jars + process-resources + + copy + + + + + org.apache.flink + flink-rpc-akka + ${project.version} + jar + true + flink-rpc-akka.jar + + + ${project.build.directory}/classes + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + org.apache.flink:flink-rpc-akka + + + + + org.apache.flink:flink-rpc-akka + + META-INF/NOTICE + META-INF/licenses/** + + + + + + + + + + diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java new file mode 100644 index 0000000000000..0a3d0b8a1a510 --- /dev/null +++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.core.classloading.SubmoduleClassLoader; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.runtime.rpc.RpcSystemLoader; +import org.apache.flink.util.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ServiceLoader; +import java.util.UUID; + +/** + * Loader for the {@link AkkaRpcSystemLoader}. + * + *

This loader expects the flink-rpc-akka jar to be accessible via {@link + * ClassLoader#getResource(String)}. It will extract the jar into a temporary directory and create a + * new {@link SubmoduleClassLoader} to load the rpc system from that jar. + */ +public class AkkaRpcSystemLoader implements RpcSystemLoader { + + @Override + public RpcSystem loadRpcSystem(Configuration config) { + try { + final ClassLoader flinkClassLoader = RpcSystem.class.getClassLoader(); + + final String tmpDirectory = ConfigurationUtils.parseTempDirectories(config)[0]; + final Path tempFile = + Files.createFile( + Paths.get( + tmpDirectory, "_flink-rpc-akka_" + UUID.randomUUID() + ".jar")); + + final InputStream resourceStream = + flinkClassLoader.getResourceAsStream("flink-rpc-akka.jar"); + if (resourceStream == null) { + throw new RuntimeException( + "Akka RPC system could not be found. If this happened while running a test in the IDE," + + "run the process-resources phase on flink-rpc/flink-rpc-akka-loader via maven."); + } + + IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile)); + + final SubmoduleClassLoader submoduleClassLoader = + new SubmoduleClassLoader( + new URL[] {tempFile.toUri().toURL()}, flinkClassLoader); + + return new CleanupOnCloseRpcSystem( + ServiceLoader.load(RpcSystem.class, submoduleClassLoader).iterator().next(), + submoduleClassLoader, + tempFile); + } catch (IOException e) { + throw new RuntimeException("Could not initialize RPC system.", e); + } + } +} diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java new file mode 100644 index 0000000000000..621047bdd1bca --- /dev/null +++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.classloading.SubmoduleClassLoader; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; + +/** An {@link RpcSystem} wrapper that cleans up resources after the RPC system has been closed. */ +class CleanupOnCloseRpcSystem implements RpcSystem { + private static final Logger LOG = LoggerFactory.getLogger(CleanupOnCloseRpcSystem.class); + + private final RpcSystem rpcSystem; + private final SubmoduleClassLoader pluginLoader; + private final Path tempFile; + + public CleanupOnCloseRpcSystem( + RpcSystem rpcSystem, SubmoduleClassLoader pluginLoader, Path tempFile) { + this.rpcSystem = Preconditions.checkNotNull(rpcSystem); + this.pluginLoader = Preconditions.checkNotNull(pluginLoader); + this.tempFile = Preconditions.checkNotNull(tempFile); + } + + @Override + public void close() { + rpcSystem.close(); + + try { + pluginLoader.close(); + } catch (Exception e) { + LOG.warn("Could not close RpcSystem classloader.", e); + } + try { + Files.delete(tempFile); + } catch (Exception e) { + LOG.warn("Could not delete temporary rpc system file {}.", tempFile, e); + } + } + + @Override + public RpcServiceBuilder localServiceBuilder(Configuration config) { + return rpcSystem.localServiceBuilder(config); + } + + @Override + public RpcServiceBuilder remoteServiceBuilder( + Configuration configuration, + @Nullable String externalAddress, + String externalPortRange) { + return rpcSystem.remoteServiceBuilder(configuration, externalAddress, externalPortRange); + } + + @Override + public String getRpcUrl( + String hostname, + int port, + String endpointName, + AddressResolution addressResolution, + Configuration config) + throws UnknownHostException { + return rpcSystem.getRpcUrl(hostname, port, endpointName, addressResolution, config); + } + + @Override + public InetSocketAddress getInetSocketAddressFromRpcUrl(String url) throws Exception { + return rpcSystem.getInetSocketAddressFromRpcUrl(url); + } + + @Override + public long getMaximumMessageSizeInBytes(Configuration config) { + return rpcSystem.getMaximumMessageSizeInBytes(config); + } +} diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader b/flink-rpc/flink-rpc-akka-loader/src/main/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader new file mode 100644 index 0000000000000..8387fa4c69658 --- /dev/null +++ b/flink-rpc/flink-rpc-akka-loader/src/main/resources/META-INF/services/org.apache.flink.runtime.rpc.RpcSystemLoader @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.runtime.rpc.akka.AkkaRpcSystemLoader diff --git a/flink-rpc/flink-rpc-akka/pom.xml b/flink-rpc/flink-rpc-akka/pom.xml index d7eb9b3545a55..a4cc211759c91 100644 --- a/flink-rpc/flink-rpc-akka/pom.xml +++ b/flink-rpc/flink-rpc-akka/pom.xml @@ -29,10 +29,16 @@ under the License. .. - flink-rpc-akka_${scala.binary.version} + flink-rpc-akka Flink : RPC : Akka jar + + 2.6.15 + 2.12 + 2.12.7 + + org.apache.flink @@ -53,102 +59,91 @@ under the License. provided + + org.scala-lang + scala-reflect + compile + org.scala-lang scala-library + compile + + + org.scala-lang + scala-compiler + compile com.typesafe.akka - akka-actor_${scala.binary.version} - - - - org.scala-lang - scala-library - - + akka-actor_${akka.scala.binary.version} + ${akka.version} - com.typesafe.akka - akka-remote_${scala.binary.version} - + akka-remote_${akka.scala.binary.version} + ${akka.version} - org.scala-lang - scala-library + + io.aeron + aeron-driver + + + + io.aeron + aeron-client - - + + com.typesafe.akka + akka-slf4j_${akka.scala.binary.version} + ${akka.version} + io.netty netty 3.10.6.Final - - com.typesafe.akka - akka-stream_${scala.binary.version} - - - - org.scala-lang - scala-library - - - com.typesafe - config - - + org.clapper + grizzled-slf4j_${akka.scala.binary.version} + 1.3.2 - - com.typesafe.akka - akka-protobuf_${scala.binary.version} - - - - org.scala-lang - scala-library - - + org.slf4j + slf4j-api + provided - com.typesafe.akka - akka-slf4j_${scala.binary.version} - - - - org.scala-lang - scala-library - - + com.google.code.findbugs + jsr305 + provided - org.clapper - grizzled-slf4j_${scala.binary.version} - - - - org.scala-lang - scala-library - - + org.apache.flink + flink-test-utils-junit org.apache.flink - flink-test-utils-junit + flink-core + ${project.version} + test + test-jar + + + + org.scalatest + scalatest_${akka.scala.binary.version} + 3.0.0 + test @@ -167,10 +162,30 @@ under the License. + + org.scala-lang + scala-compiler + ${akka.scala.version} + + + org.scala-lang + scala-library + ${akka.scala.version} + + + org.scala-lang + scala-reflect + ${akka.scala.version} + + + org.scala-lang.modules + scala-xml_${akka.scala.binary.version} + 1.0.6 + com.typesafe config - 1.3.0 + 1.4.0 @@ -201,20 +216,10 @@ under the License. - - - - com.typesafe.akka:akka-remote_* - io.netty:netty + + * - - - org.jboss.netty - org.apache.flink.shaded.akka.org.jboss.netty - - io.netty:netty @@ -228,11 +233,31 @@ under the License. + + + reference.conf + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + + enforce-versions + + enforce + + none + + + + net.alchim31.maven @@ -259,6 +284,7 @@ under the License. + ${akka.scala.version} -Xms128m -Xmx512m diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java index 8c69e14911745..3514704710349 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.java @@ -35,13 +35,19 @@ import scala.concurrent.duration.FiniteDuration; -/** Adapter to use a {@link ActorSystem} as a {@link ScheduledExecutor}. */ +/** + * Adapter to use a {@link ActorSystem} as a {@link ScheduledExecutor}. Furthermore ensures that the + * context class loader is set to the Flink class loader while the runnable is running. + */ public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor { private final ActorSystem actorSystem; + private final ClassLoader flinkClassLoader; - public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) { + public ActorSystemScheduledExecutorAdapter( + ActorSystem actorSystem, ClassLoader flinkClassLoader) { this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService"); + this.flinkClassLoader = Preconditions.checkNotNull(flinkClassLoader, "flinkClassLoader"); } @Override @@ -86,7 +92,8 @@ public ScheduledFuture scheduleAtFixedRate( .schedule( new FiniteDuration(initialDelay, unit), new FiniteDuration(period, unit), - scheduledFutureTask, + ClassLoadingUtils.withContextClassLoader( + scheduledFutureTask, flinkClassLoader), actorSystem.dispatcher()); scheduledFutureTask.setCancellable(cancellable); @@ -111,13 +118,18 @@ public ScheduledFuture scheduleWithFixedDelay( @Override public void execute(@Nonnull Runnable command) { - actorSystem.dispatcher().execute(command); + actorSystem + .dispatcher() + .execute(ClassLoadingUtils.withContextClassLoader(command, flinkClassLoader)); } private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) { return actorSystem .scheduler() - .scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher()); + .scheduleOnce( + new FiniteDuration(delay, unit), + ClassLoadingUtils.withContextClassLoader(runnable, flinkClassLoader), + actorSystem.dispatcher()); } private long now() { diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtils.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtils.java new file mode 100644 index 0000000000000..623c99f89ea3e --- /dev/null +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtils.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent.akka; + +import org.apache.flink.util.TemporaryClassLoaderContext; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** Classloading utilities. */ +public class ClassLoadingUtils { + + /** + * Wraps the given runnable in a {@link TemporaryClassLoaderContext} to prevent the plugin class + * loader from leaking into Flink. + * + * @param runnable runnable to wrap + * @param contextClassLoader class loader that should be set as the context class loader + * @return wrapped runnable + */ + public static Runnable withContextClassLoader( + Runnable runnable, ClassLoader contextClassLoader) { + return () -> runWithContextClassLoader(runnable::run, contextClassLoader); + } + + /** + * Wraps the given executor such that all submitted are runnables are run in a {@link + * TemporaryClassLoaderContext} based on the given classloader. + * + * @param executor executor to wrap + * @param contextClassLoader class loader that should be set as the context class loader + * @return wrapped executor + */ + public static Executor withContextClassLoader( + Executor executor, ClassLoader contextClassLoader) { + return new ContextClassLoaderSettingExecutor(executor, contextClassLoader); + } + + /** + * Runs the given runnable in a {@link TemporaryClassLoaderContext} to prevent the plugin class + * loader from leaking into Flink. + * + * @param runnable runnable to run + * @param contextClassLoader class loader that should be set as the context class loader + */ + public static void runWithContextClassLoader( + ThrowingRunnable runnable, ClassLoader contextClassLoader) throws T { + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(contextClassLoader)) { + runnable.run(); + } + } + + /** + * Runs the given supplier in a {@link TemporaryClassLoaderContext} based on the given + * classloader. + * + * @param supplier supplier to run + * @param contextClassLoader class loader that should be set as the context class loader + */ + public static T runWithContextClassLoader( + SupplierWithException supplier, ClassLoader contextClassLoader) throws E { + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(contextClassLoader)) { + return supplier.get(); + } + } + + public static CompletableFuture guardCompletionWithContextClassLoader( + CompletableFuture future, ClassLoader contextClassLoader) { + final CompletableFuture guardedFuture = new CompletableFuture<>(); + future.whenComplete( + (value, throwable) -> + runWithContextClassLoader( + () -> FutureUtils.doForward(value, throwable, guardedFuture), + contextClassLoader)); + return guardedFuture; + } + + /** + * An {@link Executor} wrapper that temporarily resets the ContextClassLoader to the given + * ClassLoader. + */ + private static class ContextClassLoaderSettingExecutor implements Executor { + + private final Executor backingExecutor; + private final ClassLoader contextClassLoader; + + public ContextClassLoaderSettingExecutor( + Executor backingExecutor, ClassLoader contextClassLoader) { + this.backingExecutor = backingExecutor; + this.contextClassLoader = contextClassLoader; + } + + @Override + public void execute(Runnable command) { + backingExecutor.execute( + ClassLoadingUtils.withContextClassLoader(command, contextClassLoader)); + } + } +} diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 273d9522f0655..b4fd6ab4e9231 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -55,6 +55,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.guardCompletionWithContextClassLoader; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -77,6 +78,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc private final ActorRef rpcEndpoint; + private final ClassLoader flinkClassLoader; + // whether the actor ref is local and thus no message serialization is needed protected final boolean isLocal; @@ -97,11 +100,13 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc Time timeout, long maximumFramesize, @Nullable CompletableFuture terminationFuture, - boolean captureAskCallStack) { + boolean captureAskCallStack, + ClassLoader flinkClassLoader) { this.address = Preconditions.checkNotNull(address); this.hostname = Preconditions.checkNotNull(hostname); this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint); + this.flinkClassLoader = Preconditions.checkNotNull(flinkClassLoader); this.isLocal = this.rpcEndpoint.path().address().hasLocalScope(); this.timeout = Preconditions.checkNotNull(timeout); this.maximumFramesize = maximumFramesize; @@ -383,7 +388,10 @@ protected void tell(Object message) { * @return Response future */ protected CompletableFuture ask(Object message, Time timeout) { - return AkkaFutureUtils.toJava(Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds())); + final CompletableFuture response = + AkkaFutureUtils.toJava( + Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds())); + return guardCompletionWithContextClassLoader(response, flinkClassLoader); } @Override diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 588746199e9d5..d86d7a464f5e7 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -59,6 +59,7 @@ import scala.concurrent.duration.FiniteDuration; import scala.concurrent.impl.Promise; +import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -86,6 +87,8 @@ class AkkaRpcActor extends AbstractActor { /** the endpoint to invoke the methods on. */ protected final T rpcEndpoint; + private final ClassLoader flinkClassLoader; + /** the helper that tracks whether calls come from the main thread. */ private final MainThreadValidatorUtil mainThreadValidator; @@ -105,10 +108,12 @@ class AkkaRpcActor extends AbstractActor { final T rpcEndpoint, final CompletableFuture terminationFuture, final int version, - final long maximumFramesize) { + final long maximumFramesize, + final ClassLoader flinkClassLoader) { checkArgument(maximumFramesize > 0, "Maximum framesize must be positive."); this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); + this.flinkClassLoader = checkNotNull(flinkClassLoader); this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); this.terminationFuture = checkNotNull(terminationFuture); this.version = version; @@ -177,13 +182,13 @@ private void handleControlMessage(ControlMessages controlMessage) { try { switch (controlMessage) { case START: - state = state.start(this); + state = state.start(this, flinkClassLoader); break; case STOP: state = state.stop(); break; case TERMINATE: - state = state.terminate(this); + state = state.terminate(this, flinkClassLoader); break; default: handleUnknownControlMessage(controlMessage); @@ -296,13 +301,21 @@ private void handleRpcInvocation(RpcInvocation rpcInvocation) { // this supports declaration of anonymous classes rpcMethod.setAccessible(true); + final Method capturedRpcMethod = rpcMethod; if (rpcMethod.getReturnType().equals(Void.TYPE)) { // No return value to send back - rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); + runWithContextClassLoader( + () -> capturedRpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()), + flinkClassLoader); } else { final Object result; try { - result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); + result = + runWithContextClassLoader( + () -> + capturedRpcMethod.invoke( + rpcEndpoint, rpcInvocation.getArgs()), + flinkClassLoader); } catch (InvocationTargetException e) { log.debug( "Reporting back error thrown in remote procedure {}", rpcMethod, e); @@ -416,7 +429,9 @@ private Either serializeRemoteResultAn */ private void handleCallAsync(CallAsync callAsync) { try { - Object result = callAsync.getCallable().call(); + Object result = + runWithContextClassLoader( + () -> callAsync.getCallable().call(), flinkClassLoader); getSender().tell(new Status.Success(result), getSelf()); } catch (Throwable e) { @@ -437,7 +452,7 @@ private void handleRunAsync(RunAsync runAsync) { if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) { // run immediately try { - runAsync.getRunnable().run(); + runWithContextClassLoader(() -> runAsync.getRunnable().run(), flinkClassLoader); } catch (Throwable t) { log.error("Caught exception while executing runnable in main thread.", t); ExceptionUtils.rethrowIfFatalErrorOrOOM(t); @@ -510,7 +525,7 @@ private void stop(RpcEndpointTerminationResult rpcEndpointTerminationResult) { // --------------------------------------------------------------------------- interface State { - default State start(AkkaRpcActor akkaRpcActor) { + default State start(AkkaRpcActor akkaRpcActor, ClassLoader flinkClassLoader) { throw new AkkaRpcInvalidStateException( invalidStateTransitionMessage(StartedState.STARTED)); } @@ -520,7 +535,7 @@ default State stop() { invalidStateTransitionMessage(StoppedState.STOPPED)); } - default State terminate(AkkaRpcActor akkaRpcActor) { + default State terminate(AkkaRpcActor akkaRpcActor, ClassLoader flinkClassLoader) { throw new AkkaRpcInvalidStateException( invalidStateTransitionMessage(TerminatingState.TERMINATING)); } @@ -545,7 +560,7 @@ enum StartedState implements State { STARTED; @Override - public State start(AkkaRpcActor akkaRpcActor) { + public State start(AkkaRpcActor akkaRpcActor, ClassLoader flinkClassLoader) { return STARTED; } @@ -555,12 +570,15 @@ public State stop() { } @Override - public State terminate(AkkaRpcActor akkaRpcActor) { + public State terminate(AkkaRpcActor akkaRpcActor, ClassLoader flinkClassLoader) { akkaRpcActor.mainThreadValidator.enterMainThread(); CompletableFuture terminationFuture; try { - terminationFuture = akkaRpcActor.rpcEndpoint.internalCallOnStop(); + terminationFuture = + runWithContextClassLoader( + () -> akkaRpcActor.rpcEndpoint.internalCallOnStop(), + flinkClassLoader); } catch (Throwable t) { terminationFuture = FutureUtils.completedExceptionally( @@ -598,11 +616,12 @@ enum StoppedState implements State { STOPPED; @Override - public State start(AkkaRpcActor akkaRpcActor) { + public State start(AkkaRpcActor akkaRpcActor, ClassLoader flinkClassLoader) { akkaRpcActor.mainThreadValidator.enterMainThread(); try { - akkaRpcActor.rpcEndpoint.internalCallOnStart(); + runWithContextClassLoader( + () -> akkaRpcActor.rpcEndpoint.internalCallOnStart(), flinkClassLoader); } catch (Throwable throwable) { akkaRpcActor.stop( RpcEndpointTerminationResult.failure( @@ -624,7 +643,7 @@ public State stop() { } @Override - public State terminate(AkkaRpcActor akkaRpcActor) { + public State terminate(AkkaRpcActor akkaRpcActor, ClassLoader flinkClassLoader) { akkaRpcActor.stop(RpcEndpointTerminationResult.success()); return TerminatingState.TERMINATING; @@ -636,7 +655,7 @@ enum TerminatingState implements State { TERMINATING; @Override - public State terminate(AkkaRpcActor akkaRpcActor) { + public State terminate(AkkaRpcActor akkaRpcActor, ClassLoader flinkClassLoader) { return TERMINATING; } diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 4c1a788299767..c2b20fbe20a1a 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -78,6 +78,9 @@ import scala.Option; import scala.reflect.ClassTag$; +import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.guardCompletionWithContextClassLoader; +import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader; +import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.withContextClassLoader; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -98,6 +101,8 @@ public class AkkaRpcService implements RpcService { private final ActorSystem actorSystem; private final AkkaRpcServiceConfiguration configuration; + private final ClassLoader flinkClassLoader; + @GuardedBy("lock") private final Map actors = new HashMap<>(4); @@ -117,8 +122,16 @@ public class AkkaRpcService implements RpcService { @VisibleForTesting public AkkaRpcService( final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) { + this(actorSystem, configuration, AkkaRpcService.class.getClassLoader()); + } + + AkkaRpcService( + final ActorSystem actorSystem, + final AkkaRpcServiceConfiguration configuration, + final ClassLoader flinkClassLoader) { this.actorSystem = checkNotNull(actorSystem, "actor system"); this.configuration = checkNotNull(configuration, "akka rpc service configuration"); + this.flinkClassLoader = checkNotNull(flinkClassLoader, "flinkClassLoader"); Address actorSystemAddress = AkkaUtils.getAddress(actorSystem); @@ -136,7 +149,14 @@ public AkkaRpcService( captureAskCallstacks = configuration.captureAskCallStack(); - internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem); + // Akka always sets the threads context class loader to the class loader with which it was + // loaded (i.e., the plugin class loader) + // we must ensure that the context class loader is set to the Flink class loader when we + // call into Flink + // otherwise we could leak the plugin class loader or poison the context class loader of + // external threads (because they inherit the current threads context class loader) + internalScheduledExecutor = + new ActorSystemScheduledExecutorAdapter(actorSystem, flinkClassLoader); terminationFuture = new CompletableFuture<>(); @@ -158,7 +178,9 @@ private Supervisor startSupervisorActor() { new ExecutorThreadFactory( "AkkaRpcService-Supervisor-Termination-Future-Executor")); final ActorRef actorRef = - SupervisorActor.startSupervisorActor(actorSystem, terminationFutureExecutor); + SupervisorActor.startSupervisorActor( + actorSystem, + withContextClassLoader(terminationFutureExecutor, flinkClassLoader)); return Supervisor.create(actorRef, terminationFutureExecutor); } @@ -199,7 +221,8 @@ public CompletableFuture connect( configuration.getTimeout(), configuration.getMaximumFramesize(), null, - captureAskCallstacks); + captureAskCallstacks, + flinkClassLoader); }); } @@ -221,7 +244,8 @@ public > CompletableFuture configuration.getMaximumFramesize(), null, () -> fencingToken, - captureAskCallstacks); + captureAskCallstacks, + flinkClassLoader); }); } @@ -268,7 +292,8 @@ public RpcServer startServer(C rpcEndpoint) configuration.getMaximumFramesize(), actorTerminationFuture, ((FencedRpcEndpoint) rpcEndpoint)::getFencingToken, - captureAskCallstacks); + captureAskCallstacks, + flinkClassLoader); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { @@ -280,7 +305,8 @@ public RpcServer startServer(C rpcEndpoint) configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, - captureAskCallstacks); + captureAskCallstacks, + flinkClassLoader); } // Rather than using the System ClassLoader directly, we derive the ClassLoader @@ -322,7 +348,8 @@ SupervisorActor.ActorRegistration registerAkkaRpcActor(C rpcEndpoint) { rpcEndpoint, actorTerminationFuture, getVersion(), - configuration.getMaximumFramesize()), + configuration.getMaximumFramesize(), + flinkClassLoader), rpcEndpoint.getEndpointId()); final SupervisorActor.ActorRegistration actorRegistration = @@ -354,7 +381,8 @@ public RpcServer fenceRpcServer(RpcServer rpcServer, F configuration.getMaximumFramesize(), null, () -> fencingToken, - captureAskCallstacks); + captureAskCallstacks, + flinkClassLoader); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink @@ -424,11 +452,9 @@ public CompletableFuture stopService() { actorSystemTerminationFuture.whenComplete( (Void ignored, Throwable throwable) -> { - if (throwable != null) { - terminationFuture.completeExceptionally(throwable); - } else { - terminationFuture.complete(null); - } + runWithContextClassLoader( + () -> FutureUtils.doForward(ignored, throwable, terminationFuture), + flinkClassLoader); LOG.info("Stopped Akka RPC service."); }); @@ -535,27 +561,33 @@ clazz, getVersion()), HandshakeSuccessMessage .class)))); - return actorRefFuture.thenCombineAsync( - handshakeFuture, - (ActorRef actorRef, HandshakeSuccessMessage ignored) -> { - InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef); - - // Rather than using the System ClassLoader directly, we derive the ClassLoader - // from this class . That works better in cases where Flink runs embedded and - // all Flink - // code is loaded dynamically (for example from an OSGI bundle) through a custom - // ClassLoader - ClassLoader classLoader = getClass().getClassLoader(); - - @SuppressWarnings("unchecked") - C proxy = - (C) - Proxy.newProxyInstance( - classLoader, new Class[] {clazz}, invocationHandler); - - return proxy; - }, - actorSystem.dispatcher()); + final CompletableFuture gatewayFuture = + actorRefFuture.thenCombineAsync( + handshakeFuture, + (ActorRef actorRef, HandshakeSuccessMessage ignored) -> { + InvocationHandler invocationHandler = + invocationHandlerFactory.apply(actorRef); + + // Rather than using the System ClassLoader directly, we derive the + // ClassLoader from this class. + // That works better in cases where Flink runs embedded and + // all Flink code is loaded dynamically + // (for example from an OSGI bundle) through a custom ClassLoader + ClassLoader classLoader = getClass().getClassLoader(); + + @SuppressWarnings("unchecked") + C proxy = + (C) + Proxy.newProxyInstance( + classLoader, + new Class[] {clazz}, + invocationHandler); + + return proxy; + }, + actorSystem.dispatcher()); + + return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader); } private CompletableFuture resolveActorAddress(String address) { diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index ae23abb100d34..2e91569d933e9 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -23,9 +23,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcSystem; import org.apache.flink.util.NetUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TemporaryClassLoaderContext; +import org.apache.flink.util.function.TriFunction; import akka.actor.ActorSystem; import com.typesafe.config.Config; @@ -38,7 +41,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Optional; -import java.util.function.BiFunction; import static org.apache.flink.util.NetUtils.isValidClientPort; import static org.apache.flink.util.Preconditions.checkArgument; @@ -58,10 +60,10 @@ public class AkkaRpcServiceUtils { static final String SUPERVISOR_NAME = "rpc"; private static final String SIMPLE_AKKA_CONFIG_TEMPLATE = - "akka {remote {netty.tcp {maximum-frame-size = %s}}}"; + "akka {remote.classic {netty.tcp {maximum-frame-size = %s}}}"; private static final String MAXIMUM_FRAME_SIZE_PATH = - "akka.remote.netty.tcp.maximum-frame-size"; + "akka.remote.classic.netty.tcp.maximum-frame-size"; // ------------------------------------------------------------------------ // RPC instantiation @@ -326,7 +328,8 @@ public AkkaRpcService createAndStart() throws Exception { } public AkkaRpcService createAndStart( - BiFunction constructor) + TriFunction + constructor) throws Exception { if (actorSystemExecutorConfiguration == null) { actorSystemExecutorConfiguration = @@ -336,32 +339,39 @@ public AkkaRpcService createAndStart( final ActorSystem actorSystem; - if (externalAddress == null) { - // create local actor system - actorSystem = - AkkaBootstrapTools.startLocalActorSystem( - configuration, - actorSystemName, - logger, - actorSystemExecutorConfiguration, - customConfig); - } else { - // create remote actor system - actorSystem = - AkkaBootstrapTools.startRemoteActorSystem( - configuration, - actorSystemName, - externalAddress, - externalPortRange, - bindAddress, - Optional.ofNullable(bindPort), - logger, - actorSystemExecutorConfiguration, - customConfig); + // akka internally caches the context class loader + // make sure it uses the plugin class loader + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(getClass().getClassLoader())) { + if (externalAddress == null) { + // create local actor system + actorSystem = + AkkaBootstrapTools.startLocalActorSystem( + configuration, + actorSystemName, + logger, + actorSystemExecutorConfiguration, + customConfig); + } else { + // create remote actor system + actorSystem = + AkkaBootstrapTools.startRemoteActorSystem( + configuration, + actorSystemName, + externalAddress, + externalPortRange, + bindAddress, + Optional.ofNullable(bindPort), + logger, + actorSystemExecutorConfiguration, + customConfig); + } } return constructor.apply( - actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration)); + actorSystem, + AkkaRpcServiceConfiguration.fromConfiguration(configuration), + RpcService.class.getClassLoader()); } } diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java index 8557896f8734d..134c247f7769f 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java @@ -64,7 +64,8 @@ public FencedAkkaInvocationHandler( long maximumFramesize, @Nullable CompletableFuture terminationFuture, Supplier fencingTokenSupplier, - boolean captureAskCallStacks) { + boolean captureAskCallStacks, + ClassLoader flinkClassLoader) { super( address, hostname, @@ -72,7 +73,8 @@ public FencedAkkaInvocationHandler( timeout, maximumFramesize, terminationFuture, - captureAskCallStacks); + captureAskCallStacks, + flinkClassLoader); this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier); } diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java index d35e92becc7f5..56220facf25a9 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java @@ -45,8 +45,9 @@ public FencedAkkaRpcActor( T rpcEndpoint, CompletableFuture terminationFuture, int version, - final long maximumFramesize) { - super(rpcEndpoint, terminationFuture, version, maximumFramesize); + final long maximumFramesize, + ClassLoader flinkClassLoader) { + super(rpcEndpoint, terminationFuture, version, maximumFramesize, flinkClassLoader); } @Override diff --git a/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE b/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE index e00197115fd65..235f9525093c1 100644 --- a/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE +++ b/flink-rpc/flink-rpc-akka/src/main/resources/META-INF/NOTICE @@ -6,8 +6,35 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- com.typesafe.akka:akka-remote_2.11:2.5.21 +- com.hierynomus:asn-one:0.5.0 +- com.typesafe:config:1.4.0 +- com.typesafe:ssl-config-core_2.12:0.4.2 +- com.typesafe.akka:akka-actor_2.12:2.6.15 +- com.typesafe.akka:akka-remote_2.12:2.6.15 +- com.typesafe.akka:akka-pki_2.12:2.6.15 +- com.typesafe.akka:akka-protobuf-v3_2.12:2.6.15 +- com.typesafe.akka:akka-slf4j_2.12:2.6.15 +- com.typesafe.akka:akka-stream_2.12:2.6.15 - io.netty:netty:3.10.6.Final +- org.agrona:agrona:1.9.0 + +This project bundles the following dependencies under the BSD license. +See bundled license files for details. + +- org.clapper:grizzled-slf4j_2.12:1.3.2 + +The following dependencies all share the same BSD license which you find under licenses/LICENSE.scala. + +- org.scala-lang:scala-compiler:2.12.7 +- org.scala-lang:scala-library:2.12.7 +- org.scala-lang:scala-reflect:2.12.7 +- org.scala-lang.modules:scala-java8-compat_2.12:0.8.0 +- org.scala-lang.modules:scala-parser-combinators_2.12:1.0.4 +- org.scala-lang.modules:scala-xml_2.12:1.0.6 + +This project bundles the following dependencies under the Creative Commons CC0 "No Rights Reserved". + +- org.reactivestreams:reactive-streams:1.0.3 This project bundles io.netty:netty:3.10.6.Final from which it inherits the following notices: @@ -42,4 +69,4 @@ WebSocket and HTTP server: * LICENSE: * licenses/LICENSE.webbit (BSD License) * HOMEPAGE: - * https://github.com/joewalnes/webbit + * https://github.com/joewalnes/webbit \ No newline at end of file diff --git a/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/AkkaUtils.scala b/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/AkkaUtils.scala index 20d63491a95fa..323b5a3e5c592 100644 --- a/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/AkkaUtils.scala +++ b/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/AkkaUtils.scala @@ -299,6 +299,7 @@ object AkkaUtils { | guardian-supervisor-strategy = $supervisorStrategy | | warn-about-java-serializer-usage = off + | allow-java-serialization = on | | default-dispatcher { | throughput = $akkaThroughput @@ -470,9 +471,10 @@ object AkkaUtils { | provider = "akka.remote.RemoteActorRefProvider" | } | - | remote { - | startup-timeout = $startupTimeout + | remote.artery.enabled = false + | remote.startup-timeout = $startupTimeout | + | remote.classic { | # disable the transport failure detector by setting very high values | transport-failure-detector{ | acceptable-heartbeat-pause = 6000 s @@ -480,6 +482,8 @@ object AkkaUtils { | threshold = 300 | } | + | enabled-transports = ["akka.remote.classic.netty.tcp"] + | | netty { | tcp { | transport-class = "akka.remote.transport.netty.NettyTransport" @@ -522,7 +526,7 @@ object AkkaUtils { val hostnameConfigString = s""" |akka { - | remote { + | remote.classic { | netty { | tcp { | hostname = "$effectiveHostname" @@ -536,13 +540,13 @@ object AkkaUtils { val sslConfigString = if (akkaEnableSSLConfig) { s""" |akka { - | remote { + | remote.classic { | - | enabled-transports = ["akka.remote.netty.ssl"] + | enabled-transports = ["akka.remote.classic.netty.ssl"] | | netty { | - | ssl = $${akka.remote.netty.tcp} + | ssl = $${akka.remote.classic.netty.tcp} | | ssl { | diff --git a/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.scala b/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.scala index fe92eea0f0e4a..f5247e8e59ba9 100644 --- a/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.scala +++ b/flink-rpc/flink-rpc-akka/src/main/scala/org/apache/flink/runtime/rpc/akka/CustomSSLEngineProvider.scala @@ -26,7 +26,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.util.FingerprintTrust class CustomSSLEngineProvider(system : akka.actor.ActorSystem) extends ConfigSSLEngineProvider(system) { - private val securityConfig = system.settings.config.getConfig("akka.remote.netty.ssl.security") + private val securityConfig = system.settings.config. + getConfig("akka.remote.classic.netty.ssl.security") private val SSLTrustStore = securityConfig.getString("trust-store") private val SSLTrustStorePassword = securityConfig.getString("trust-store-password") private val SSLCertFingerprints = securityConfig.getStringList("cert-fingerprints") diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtilsTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtilsTest.java new file mode 100644 index 0000000000000..115e185f60cec --- /dev/null +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/concurrent/akka/ClassLoadingUtilsTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent.akka; + +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.junit.Test; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** Tests for the {@link ClassLoadingUtils}. */ +public class ClassLoadingUtilsTest extends TestLogger { + + private static final ClassLoader TEST_CLASS_LOADER = + new URLClassLoader(new URL[0], ClassLoadingUtilsTest.class.getClassLoader()); + + @Test + public void testRunnableWithContextClassLoader() throws Exception { + final CompletableFuture contextClassLoader = new CompletableFuture<>(); + Runnable runnable = + () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()); + + final Runnable wrappedRunnable = + ClassLoadingUtils.withContextClassLoader(runnable, TEST_CLASS_LOADER); + + // the runnable should only be wrapped, not run immediately + assertThat(contextClassLoader.isDone(), is(false)); + + wrappedRunnable.run(); + assertThat(contextClassLoader.get(), is(TEST_CLASS_LOADER)); + } + + @Test + public void testExecutorWithContextClassLoader() throws Exception { + final Executor wrappedExecutor = + ClassLoadingUtils.withContextClassLoader( + Executors.newDirectExecutorService(), TEST_CLASS_LOADER); + + final CompletableFuture contextClassLoader = new CompletableFuture<>(); + Runnable runnable = + () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()); + + wrappedExecutor.execute(runnable); + assertThat(contextClassLoader.get(), is(TEST_CLASS_LOADER)); + } + + @Test + public void testRunRunnableWithContextClassLoader() throws Exception { + final CompletableFuture contextClassLoader = new CompletableFuture<>(); + ThrowingRunnable runnable = + () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()); + + ClassLoadingUtils.runWithContextClassLoader(runnable, TEST_CLASS_LOADER); + assertThat(contextClassLoader.get(), is(TEST_CLASS_LOADER)); + } + + @Test + public void testRunSupplierWithContextClassLoader() throws Exception { + SupplierWithException runnable = + () -> Thread.currentThread().getContextClassLoader(); + + final ClassLoader contextClassLoader = + ClassLoadingUtils.runWithContextClassLoader(runnable, TEST_CLASS_LOADER); + assertThat(contextClassLoader, is(TEST_CLASS_LOADER)); + } + + @Test + public void testGuardCompletionWithContextClassLoader() throws Exception { + final CompletableFuture originalFuture = new CompletableFuture<>(); + + final CompletableFuture guardedFuture = + ClassLoadingUtils.guardCompletionWithContextClassLoader( + originalFuture, TEST_CLASS_LOADER); + + final CompletableFuture contextClassLoader = + guardedFuture.thenApply(ignored -> Thread.currentThread().getContextClassLoader()); + + originalFuture.complete(null); + assertThat(contextClassLoader.get(), is(TEST_CLASS_LOADER)); + } +} diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java new file mode 100644 index 0000000000000..1a585a1f6518f --- /dev/null +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; + +import akka.actor.ActorSystem; +import akka.actor.Terminated; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader; +import static org.hamcrest.CoreMatchers.either; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** + * Tests the context class loader handling in various parts of the akka rpc system. + * + *

The tests check cases where we call from the akka rpc system into Flink, in which case the + * context class loader must be set to the Flink class loader. This ensures that the Akka class + * loader does not get accidentally leaked, e.g., via thread locals or thread pools on the Flink + * side. + */ +public class ContextClassLoadingSettingTest extends TestLogger { + + private static final Time TIMEOUT = Time.milliseconds(10000L); + + // Many of the contained tests assert that a future is completed with a specific context class + // loader by applying a synchronous operation. + // If the initial future is completed by the time we apply the synchronous operation the test + // thread will execute the operation instead. The tests are thus susceptible to timing issues. + // We hence take a probabilistic approach: Assume that this timing is rare, guard these calls in + // the test with a temporary class loader context, and assert that the actually used + // context class loader is _either_ the one we truly expect or the temporary one. + private static final ClassLoader testClassLoader = + new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader()); + + private ClassLoader pretendFlinkClassLoader; + private ActorSystem actorSystem; + private AkkaRpcService akkaRpcService; + + @Before + public void setup() { + pretendFlinkClassLoader = + new URLClassLoader( + new URL[0], ContextClassLoadingSettingTest.class.getClassLoader()); + actorSystem = AkkaUtils.createDefaultActorSystem(); + akkaRpcService = + new AkkaRpcService( + actorSystem, + AkkaRpcServiceConfiguration.defaultConfiguration(), + pretendFlinkClassLoader); + } + + @After + public void shutdown() throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture rpcTerminationFuture = akkaRpcService.stopService(); + final CompletableFuture actorSystemTerminationFuture = + AkkaFutureUtils.toJava(actorSystem.terminate()); + + FutureUtils.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + actorSystem = null; + akkaRpcService = null; + } + + @Test + public void testAkkaRpcService_ExecuteRunnableSetsFlinkContextClassLoader() + throws ExecutionException, InterruptedException { + final CompletableFuture contextClassLoader = new CompletableFuture<>(); + akkaRpcService.execute( + () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader())); + assertIsFlinkClassLoader(contextClassLoader.get()); + } + + @Test + public void testAkkaRpcService_ExecuteCallableSetsFlinkContextClassLoader() + throws ExecutionException, InterruptedException { + final CompletableFuture contextClassLoader = + akkaRpcService.execute(() -> Thread.currentThread().getContextClassLoader()); + assertIsFlinkClassLoader(contextClassLoader.get()); + } + + @Test + public void testAkkaRpcService_ExecuteCallableResultCompletedWithFlinkContextClassLoader() + throws ExecutionException, InterruptedException { + + final CompletableFuture blocker = new CompletableFuture<>(); + + final CompletableFuture contextClassLoader = + runWithContextClassLoader( + () -> + akkaRpcService + .execute((Callable) blocker::get) + .thenApply( + ignored -> + Thread.currentThread() + .getContextClassLoader()), + testClassLoader); + blocker.complete(null); + + assertIsFlinkClassLoader(contextClassLoader.get()); + } + + @Test + public void testAkkaRpcService_ScheduleSetsFlinkContextClassLoader() + throws ExecutionException, InterruptedException { + final CompletableFuture contextClassLoader = new CompletableFuture<>(); + akkaRpcService.scheduleRunnable( + () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()), + 5, + TimeUnit.MILLISECONDS); + assertThat(contextClassLoader.get(), is(pretendFlinkClassLoader)); + } + + @Test + public void testAkkaRpcService_ConnectFutureCompletedWithFlinkContextClassLoader() + throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + + final ClassLoader contextClassLoader = + runWithContextClassLoader( + () -> + akkaRpcService + .connect( + testEndpoint.getAddress(), + TestEndpointGateway.class) + .thenApply( + ignored -> + Thread.currentThread() + .getContextClassLoader()) + .get(), + testClassLoader); + assertIsFlinkClassLoader(contextClassLoader); + } + } + + @Test + public void testAkkaRpcService_TerminationFutureCompletedWithFlinkContextClassLoader() + throws Exception { + final ClassLoader contextClassLoader = + runWithContextClassLoader( + () -> + akkaRpcService + .stopService() + .thenApply( + ignored -> + Thread.currentThread() + .getContextClassLoader()) + .get(), + testClassLoader); + + assertIsFlinkClassLoader(contextClassLoader); + } + + @Test + public void testAkkaRpcActor_OnStartCalledWithFlinkContextClassLoader() throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + assertIsFlinkClassLoader(testEndpoint.onStartClassLoader.get()); + } + } + + @Test + public void testAkkaRpcActor_OnStopCalledWithFlinkContextClassLoader() throws Exception { + final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService); + testEndpoint.start(); + testEndpoint.close(); + + assertIsFlinkClassLoader(testEndpoint.onStopClassLoader.get()); + } + + @Test + public void testAkkaRpcActor_CallAsyncCalledWithFlinkContextClassLoader() throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + + final CompletableFuture contextClassLoader = testEndpoint.doCallAsync(); + assertIsFlinkClassLoader(contextClassLoader.get()); + } + } + + @Test + public void testAkkaRpcActor_RunAsyncCalledWithFlinkContextClassLoader() throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + + final CompletableFuture contextClassLoader = testEndpoint.doRunAsync(); + assertIsFlinkClassLoader(contextClassLoader.get()); + } + } + + @Test + public void testAkkaRpcActor_RPCReturningVoidCalledWithFlinkContextClassLoader() + throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + + final TestEndpointGateway testEndpointGateway = + akkaRpcService + .connect(testEndpoint.getAddress(), TestEndpointGateway.class) + .get(); + testEndpointGateway.doSomethingWithoutReturningAnything(); + + assertIsFlinkClassLoader(testEndpoint.voidOperationClassLoader.get()); + } + } + + @Test + public void testAkkaRpcActor_RPCCalledWithFlinkContextClassLoader() throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + + final TestEndpointGateway testEndpointGateway = + akkaRpcService + .connect(testEndpoint.getAddress(), TestEndpointGateway.class) + .get(); + final ClassLoader contextClassLoader = + testEndpointGateway.getContextClassLoader().get(); + assertIsFlinkClassLoader(contextClassLoader); + } + } + + @Test + public void testAkkaRpcInvocationHandler_RPCFutureCompletedWithFlinkContextClassLoader() + throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + + final TestEndpointGateway testEndpointGateway = + akkaRpcService + .connect(testEndpoint.getAddress(), TestEndpointGateway.class) + .get(); + final CompletableFuture contextClassLoader = + runWithContextClassLoader( + () -> + testEndpointGateway + .doSomethingAsync() + .thenApply( + ignored -> + Thread.currentThread() + .getContextClassLoader()), + testClassLoader); + testEndpoint.completeRPCFuture(); + + assertIsFlinkClassLoader(contextClassLoader.get()); + } + } + + @Test + public void testSupervisorActor_TerminationFutureCompletedWithFlinkContextClassLoader() + throws Exception { + final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService); + testEndpoint.start(); + + final ClassLoader contextClassLoader = + runWithContextClassLoader( + () -> + testEndpoint + .closeAsync() + .thenApply( + ignored -> + Thread.currentThread() + .getContextClassLoader()) + .get(), + testClassLoader); + + assertIsFlinkClassLoader(contextClassLoader); + } + + private void assertIsFlinkClassLoader(ClassLoader classLoader) { + assertThat(classLoader, either(is(pretendFlinkClassLoader)).or(is(testClassLoader))); + } + + private interface TestEndpointGateway extends RpcGateway { + CompletableFuture getContextClassLoader(); + + CompletableFuture doSomethingAsync(); + + CompletableFuture doCallAsync(); + + CompletableFuture doRunAsync(); + + void doSomethingWithoutReturningAnything(); + } + + private static class TestEndpoint extends RpcEndpoint implements TestEndpointGateway { + + private final CompletableFuture onStartClassLoader = new CompletableFuture<>(); + private final CompletableFuture onStopClassLoader = new CompletableFuture<>(); + private final CompletableFuture voidOperationClassLoader = + new CompletableFuture<>(); + private final CompletableFuture rpcResponseFuture = new CompletableFuture<>(); + + protected TestEndpoint(RpcService rpcService) { + super(rpcService); + } + + @Override + protected void onStart() throws Exception { + onStartClassLoader.complete(Thread.currentThread().getContextClassLoader()); + super.onStart(); + } + + @Override + protected CompletableFuture onStop() { + onStopClassLoader.complete(Thread.currentThread().getContextClassLoader()); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture doSomethingAsync() { + return rpcResponseFuture; + } + + @Override + public CompletableFuture doCallAsync() { + return callAsync( + () -> Thread.currentThread().getContextClassLoader(), + Time.of(10, TimeUnit.SECONDS)); + } + + @Override + public CompletableFuture doRunAsync() { + final CompletableFuture contextClassLoader = new CompletableFuture<>(); + runAsync( + () -> + contextClassLoader.complete( + Thread.currentThread().getContextClassLoader())); + return contextClassLoader; + } + + @Override + public void doSomethingWithoutReturningAnything() { + voidOperationClassLoader.complete(Thread.currentThread().getContextClassLoader()); + } + + public void completeRPCFuture() { + rpcResponseFuture.complete(null); + } + + @Override + public CompletableFuture getContextClassLoader() { + return CompletableFuture.completedFuture( + Thread.currentThread().getContextClassLoader()); + } + } +} diff --git a/flink-rpc/flink-rpc-akka/src/test/scala/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.scala b/flink-rpc/flink-rpc-akka/src/test/scala/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.scala index 5b65eaf0c7ecc..eb11cbae7541e 100644 --- a/flink-rpc/flink-rpc-akka/src/test/scala/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.scala +++ b/flink-rpc/flink-rpc-akka/src/test/scala/org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.scala @@ -65,7 +65,7 @@ class AkkaUtilsTest } test("getHostFromAkkaURL should return host after at sign") { - val url = "akka://flink@localhost:1234/user/jobmanager" + val url = "akka.tcp://flink@localhost:1234/user/jobmanager" val expected = new InetSocketAddress("localhost", 1234) val result = AkkaUtils.getInetSocketAddressFromAkkaURL(url) @@ -146,14 +146,14 @@ class AkkaUtilsTest val akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port) - akkaConfig.getString("akka.remote.netty.tcp.hostname") should + akkaConfig.getString("akka.remote.classic.netty.tcp.hostname") should equal(NetUtils.unresolvedHostToNormalizedString(hostname)) } test("null hostname should go to localhost") { val configure = AkkaUtils.getAkkaConfig(new Configuration(), Some((null, 1772))) - val hostname = configure.getString("akka.remote.netty.tcp.hostname") + val hostname = configure.getString("akka.remote.classic.netty.tcp.hostname") InetAddress.getByName(hostname).isLoopbackAddress should be(true) } @@ -196,7 +196,7 @@ class AkkaUtilsTest val akkaConfig = AkkaUtils.getAkkaConfig(configuration, ipv6AddressString, port) - akkaConfig.getString("akka.remote.netty.tcp.hostname") should + akkaConfig.getString("akka.remote.classic.netty.tcp.hostname") should equal(NetUtils.unresolvedHostToNormalizedString(ipv6AddressString)) } @@ -216,7 +216,7 @@ class AkkaUtilsTest val akkaConfig = AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337))) - val sslConfig = akkaConfig.getConfig("akka.remote.netty.ssl") + val sslConfig = akkaConfig.getConfig("akka.remote.classic.netty.ssl") sslConfig.getString("ssl-engine-provider") should equal("org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider") @@ -233,7 +233,7 @@ class AkkaUtilsTest val akkaConfig = AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337))) - val sslConfig = akkaConfig.getConfig("akka.remote.netty.ssl") + val sslConfig = akkaConfig.getConfig("akka.remote.classic.netty.ssl") sslConfig.getString("ssl-engine-provider") should equal("org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider") diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java index 5934c607d4d24..9f76e8d68485e 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java @@ -88,8 +88,7 @@ static RpcSystem load() { * @return loaded RpcSystem */ static RpcSystem load(Configuration config) { - final ClassLoader classLoader = RpcSystem.class.getClassLoader(); - return ServiceLoader.load(RpcSystem.class, classLoader).iterator().next(); + return ServiceLoader.load(RpcSystemLoader.class).iterator().next().loadRpcSystem(config); } /** Descriptor for creating a fork-join thread-pool. */ diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystemLoader.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystemLoader.java new file mode 100644 index 0000000000000..aff47a58325a5 --- /dev/null +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystemLoader.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import org.apache.flink.configuration.Configuration; + +/** A loader for an {@link RpcSystem}. */ +public interface RpcSystemLoader { + RpcSystem loadRpcSystem(Configuration config); +} diff --git a/flink-rpc/pom.xml b/flink-rpc/pom.xml index dd405eddf454a..42aba1e38343d 100644 --- a/flink-rpc/pom.xml +++ b/flink-rpc/pom.xml @@ -36,5 +36,6 @@ under the License. flink-rpc-core flink-rpc-akka + flink-rpc-akka-loader diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml index 6fe9c9102c3bb..a7646808ea081 100644 --- a/flink-runtime-web/pom.xml +++ b/flink-runtime-web/pom.xml @@ -48,7 +48,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} @@ -94,7 +94,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test @@ -114,7 +114,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 7e8d1c52e7786..d29c369f8b57e 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -29,7 +29,7 @@ under the License. .. - flink-runtime_${scala.binary.version} + flink-runtime Flink : Runtime jar @@ -52,7 +52,7 @@ under the License. org.apache.flink - flink-rpc-akka_${scala.binary.version} + flink-rpc-akka-loader ${project.version} @@ -175,7 +175,14 @@ under the License. org.apache.flink - flink-rpc-akka_${scala.binary.version} + flink-rpc-akka + ${project.version} + test + + + + org.apache.flink + flink-rpc-akka ${project.version} test test-jar @@ -304,6 +311,33 @@ under the License. + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-rpc-akka-jars + process-resources + + copy + + + + + org.apache.flink + flink-rpc-akka + ${project.version} + jar + true + flink-rpc-akka.jar + + + ${project.build.directory}/classes + + + + diff --git a/flink-state-backends/flink-statebackend-changelog/pom.xml b/flink-state-backends/flink-statebackend-changelog/pom.xml index 0c4e8a5f20dd5..6b7725ed08370 100644 --- a/flink-state-backends/flink-statebackend-changelog/pom.xml +++ b/flink-state-backends/flink-statebackend-changelog/pom.xml @@ -31,7 +31,7 @@ under the License. .. - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog Flink : State backends : Changelog jar @@ -47,7 +47,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} provided @@ -66,7 +66,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test test-jar diff --git a/flink-state-backends/flink-statebackend-heap-spillable/pom.xml b/flink-state-backends/flink-statebackend-heap-spillable/pom.xml index 8be1989ef8d6c..c07b966bf6320 100644 --- a/flink-state-backends/flink-statebackend-heap-spillable/pom.xml +++ b/flink-state-backends/flink-statebackend-heap-spillable/pom.xml @@ -31,7 +31,7 @@ under the License. .. - flink-statebackend-heap-spillable_${scala.binary.version} + flink-statebackend-heap-spillable Flink : State backends : Heap spillable jar @@ -41,7 +41,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} provided @@ -55,7 +55,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test test-jar diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml b/flink-state-backends/flink-statebackend-rocksdb/pom.xml index fc923a175cffc..9e03f4e628c1e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml +++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml @@ -76,7 +76,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml index 81643cd34e025..9e72a59d934c6 100644 --- a/flink-streaming-java/pom.xml +++ b/flink-streaming-java/pom.xml @@ -52,7 +52,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} @@ -97,7 +97,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test test-jar diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml index e8bc96f16c91c..d8a3e09898af7 100644 --- a/flink-streaming-scala/pom.xml +++ b/flink-streaming-scala/pom.xml @@ -85,7 +85,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test @@ -116,7 +116,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test test-jar diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml index 8c13024387e6f..88a0ca7518303 100644 --- a/flink-table/flink-sql-client/pom.xml +++ b/flink-table/flink-sql-client/pom.xml @@ -128,7 +128,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test @@ -136,7 +136,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-table/flink-table-api-java-bridge/pom.xml b/flink-table/flink-table-api-java-bridge/pom.xml index 988b8b81e8d14..47f1264fbacf7 100644 --- a/flink-table/flink-table-api-java-bridge/pom.xml +++ b/flink-table/flink-table-api-java-bridge/pom.xml @@ -78,7 +78,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/flink-table/flink-table-planner/pom.xml b/flink-table/flink-table-planner/pom.xml index 767641e0617d7..f636981c3cc85 100644 --- a/flink-table/flink-table-planner/pom.xml +++ b/flink-table/flink-table-planner/pom.xml @@ -281,7 +281,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml index 9f71e67ef79e1..94897529ae6f6 100644 --- a/flink-test-utils-parent/flink-test-utils/pom.xml +++ b/flink-test-utils-parent/flink-test-utils/pom.xml @@ -44,14 +44,14 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} compile org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar compile diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 1d771bd73dd39..a0219f1380d8f 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -87,14 +87,14 @@ under the License. org.apache.flink - flink-optimizer_${scala.binary.version} + flink-optimizer ${project.version} test org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test @@ -187,7 +187,7 @@ under the License. org.apache.flink - flink-optimizer_${scala.binary.version} + flink-optimizer ${project.version} test-jar test @@ -195,7 +195,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test @@ -249,7 +249,7 @@ under the License. org.apache.flink - flink-statebackend-changelog_${scala.binary.version} + flink-statebackend-changelog ${project.version} test diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml index 6c7ecc2c86d7e..bb4101644a85e 100644 --- a/flink-yarn-tests/pom.xml +++ b/flink-yarn-tests/pom.xml @@ -63,7 +63,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test test-jar diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml index 22860a73f30f7..5a14a9d63f78f 100644 --- a/flink-yarn/pom.xml +++ b/flink-yarn/pom.xml @@ -43,7 +43,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} @@ -88,7 +88,7 @@ under the License. org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${project.version} test-jar test diff --git a/pom.xml b/pom.xml index c2dbb32ae0e70..8b022329fd505 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,6 @@ under the License. true 13.0 18.0 - 2.5.21 1.8 1.7.15 2.14.1 @@ -507,13 +506,6 @@ under the License. 2.1 - - - com.typesafe - config - 1.3.0 - - commons-logging @@ -662,56 +654,6 @@ under the License. ${scala.version} - - org.clapper - grizzled-slf4j_${scala.binary.version} - 1.3.2 - - - - com.typesafe.akka - akka-actor_${scala.binary.version} - ${akka.version} - - - - com.typesafe.akka - akka-remote_${scala.binary.version} - ${akka.version} - - - io.aeron - aeron-driver - - - io.aeron - aeron-client - - - - - - - com.typesafe.akka - akka-stream_${scala.binary.version} - ${akka.version} - - - - - com.typesafe.akka - akka-protobuf_${scala.binary.version} - ${akka.version} - - - - com.typesafe.akka - akka-slf4j_${scala.binary.version} - ${akka.version} - - org.scala-lang.modules scala-parser-combinators_${scala.binary.version} diff --git a/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java b/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java index d21d26681a542..239bdd5bff0c4 100644 --- a/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java +++ b/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java @@ -52,6 +52,8 @@ public class ScalaSuffixChecker { // [INFO] +- org.scala-lang:scala-reflect:jar:2.11.12:test private static final Pattern scalaSuffixPattern = Pattern.compile("_2.1[0-9]"); + private static final String AKKA_RPC_MODULE_NAME = "flink-rpc-akka"; + public static void main(String[] args) throws IOException { if (args.length < 2) { System.out.println("Usage: ScalaSuffixChecker "); @@ -93,6 +95,11 @@ private static ParseResult parseMavenOutput(final Path path) throws IOException final Matcher matcher = moduleNamePattern.matcher(line); if (matcher.matches()) { final String moduleName = stripScalaSuffix(matcher.group(1)); + // we ignored flink-rpc-akka because it is loaded through a separate class + // loader + if (moduleName.equals(AKKA_RPC_MODULE_NAME)) { + continue; + } LOG.trace("Parsing module '{}'.", moduleName); // skip: [INFO] org.apache.flink:flink-annotations:jar:1.14-SNAPSHOT @@ -103,10 +110,14 @@ private static ParseResult parseMavenOutput(final Path path) throws IOException while (blockPattern.matcher(line).matches()) { final boolean dependsOnScala = dependsOnScala(line); final boolean isTestDependency = line.endsWith(":test"); + // we ignored flink-rpc-akka because it is loaded through a separate class + // loader + final boolean isFlinkAkkaRpc = line.contains(AKKA_RPC_MODULE_NAME); LOG.trace("\tline:{}", line); LOG.trace("\t\tdepends-on-scala:{}", dependsOnScala); LOG.trace("\t\tis-test-dependency:{}", isTestDependency); - if (dependsOnScala && !isTestDependency) { + LOG.trace("\t\tis-flink-rpc-akka:{}", isFlinkAkkaRpc); + if (dependsOnScala && !isTestDependency && !isFlinkAkkaRpc) { LOG.trace("\t\tOutbreak detected at {}!", moduleName); infected = true; }