diff --git a/docs/ops/config.md b/docs/ops/config.md index e0b9d4db714f5..9d2405e525e19 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -74,6 +74,9 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to - `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) configuration **directory** (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (`hdfs:///path/to/files`, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like `hdfs://address:port/path/to/files`. This option also causes file writers to pick up the HDFS's default values for block sizes and replication factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in the specified directory. +- `classloader.resolve-order`: Whether Flink should use a child-first `ClassLoader` when loading +user-code classes or a parent-first `ClassLoader`. Can be one of `parent-first` or `child-first`. (default: `child-first`) + ## Advanced Options ### Compute @@ -186,7 +189,7 @@ will be used under the directory specified by jobmanager.web.tmpdir. - `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. -- `state.backend.rocksdb.checkpointdir`: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ‘:’ (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`) +- `state.backend.rocksdb.checkpointdir`: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ':' (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`) - `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/ops/state/checkpoints.html#externalized-checkpoints). diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java index ae94ece37133a..768de877d64ba 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java @@ -19,7 +19,7 @@ package org.apache.flink.client.program; import org.apache.flink.api.common.Plan; -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import java.io.File; import java.io.IOException; @@ -133,6 +133,6 @@ public static ClassLoader buildUserCodeClassLoader(List jars, List cla for (int i = 0; i < classpaths.size(); i++) { urls[i + jars.size()] = classpaths.get(i); } - return new FlinkUserCodeClassLoader(urls, parent); + return FlinkUserCodeClassLoaders.parentFirst(urls, parent); } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java index 4ec65324fcd1b..1dbc05efef463 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java @@ -18,6 +18,8 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -25,7 +27,6 @@ import java.lang.reflect.Method; import java.net.URL; -import java.net.URLClassLoader; import static org.junit.Assert.assertNotEquals; @@ -46,8 +47,8 @@ public void testTwoSeparateClassLoaders() throws Exception { final URL codePath2 = RocksDB.class.getProtectionDomain().getCodeSource().getLocation(); final ClassLoader parent = getClass().getClassLoader(); - final ClassLoader loader1 = new ChildFirstClassLoader(new URL[] { codePath1, codePath2 }, parent); - final ClassLoader loader2 = new ChildFirstClassLoader(new URL[] { codePath1, codePath2 }, parent); + final ClassLoader loader1 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent); + final ClassLoader loader2 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent); final String className = RocksDBStateBackend.class.getName(); @@ -69,32 +70,4 @@ public void testTwoSeparateClassLoaders() throws Exception { meth1.invoke(instance1, tempDir); meth2.invoke(instance2, tempDir); } - - // ------------------------------------------------------------------------ - - /** - * A variant of the URLClassLoader that first loads from the URLs and only after that from the parent. - */ - private static final class ChildFirstClassLoader extends URLClassLoader { - - private final ClassLoader parent; - - public ChildFirstClassLoader(URL[] urls, ClassLoader parent) { - super(urls, null); - this.parent = parent; - } - - @Override - public Class findClass(String name) throws ClassNotFoundException { - // first try to load from the URLs - // because the URLClassLoader's parent is null, this cannot implicitly load from the parent - try { - return super.findClass(name); - } - catch (ClassNotFoundException e) { - // not in the URL, check the parent - return parent.loadClass(name); - } - } - } } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index 80d610aa3e819..d1005c423e23b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -27,6 +27,14 @@ public class CoreOptions { // process parameters // ------------------------------------------------------------------------ + public static final ConfigOption CLASSLOADER_RESOLVE_ORDER = ConfigOptions + .key("classloader.resolve-order") + .defaultValue("child-first"); + + // ------------------------------------------------------------------------ + // process parameters + // ------------------------------------------------------------------------ + public static final ConfigOption FLINK_JVM_OPTIONS = ConfigOptions .key("env.java.opts") .defaultValue(""); diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml new file mode 100644 index 0000000000000..db05c36e7afba --- /dev/null +++ b/flink-end-to-end-tests/pom.xml @@ -0,0 +1,107 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-parent + 1.4-SNAPSHOT + .. + + + flink-end-to-end-tests_${scala.binary.version} + flink-end-to-end-tests + + jar + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + + ClassLoaderTestProgram + package + + jar + + + ClassLoaderTestProgram + + + + org.apache.flink.streaming.tests.ClassLoaderTestProgram + + + + + org/apache/flink/streaming/tests/ClassLoaderTestProgram.class + org/apache/flink/runtime/taskmanager/TaskManager.class + .version.properties + + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.7 + + + rename + package + + run + + + + + + + + + + + + + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java similarity index 64% rename from flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java rename to flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java index 015f6c77ef003..3626885ddfdf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java +++ b/flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java @@ -15,21 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.runtime.execution.librarycache; -import java.net.URL; -import java.net.URLClassLoader; +package org.apache.flink.runtime.taskmanager; /** - * Gives the URLClassLoader a nicer name for debugging purposes. + * A {@code Taskmanager} in the same package as the proper Flink {@link TaskManager}. We use this + * to check whether Flink correctly uses the child-first {@link ClassLoader} when configured to do + * so. */ -public class FlinkUserCodeClassLoader extends URLClassLoader { - - public FlinkUserCodeClassLoader(URL[] urls) { - this(urls, FlinkUserCodeClassLoader.class.getClassLoader()); - } - - public FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) { - super(urls, parent); +public class TaskManager { + public static String getMessage() { + return "Hello, World!"; } } diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java new file mode 100644 index 0000000000000..1d4ca4c592a7d --- /dev/null +++ b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.io.InputStream; +import java.net.URL; +import java.util.Enumeration; +import java.util.Properties; + +/** + * End-to-end test program for verifying that the {@code classloader.resolve-order} setting + * is being honored by Flink. We test this by creating a fake {@code TaskManager} with a single + * method that we call in the same package as the original Flink {@code TaskManager} and verify that + * we get a {@link NoSuchMethodError} if we're running with {@code parent-first} class loading + * and that we get the correct result from the method when we're running with {@code child-first} + * class loading. + */ +public class ClassLoaderTestProgram { + + public static void main(String[] args) throws Exception { + + final ParameterTool params = ParameterTool.fromArgs(args); + + final String resolveOrder = params.getRequired("resolve-order"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env + .fromElements("Hello") + .map((MapFunction) value -> { + + String gitUrl; + + try (InputStream propFile = ClassLoaderTestProgram.class.getClassLoader().getResourceAsStream(".version.properties")) { + Properties properties = new Properties(); + properties.load(propFile); + gitUrl = properties.getProperty("git.remote.origin.url"); + } + + Enumeration resources = ClassLoaderTestProgram.class.getClassLoader().getResources( + ".version.properties"); + + StringBuilder sortedProperties = new StringBuilder(); + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + try (InputStream in = url.openStream()) { + Properties properties = new Properties(); + properties.load(in); + String orderedGitUrl = properties.getProperty("git.remote.origin.url"); + sortedProperties.append(orderedGitUrl); + } + } + + if (resolveOrder.equals("parent-first")) { + try { + @SuppressWarnings("unused") + String ignored = TaskManager.getMessage(); + + throw new RuntimeException( + "TaskManager.getMessage() should not be available with parent-first " + + "ClassLoader order."); + + } catch (NoSuchMethodError e) { + // expected + } + return "NoSuchMethodError:" + gitUrl + ":" + sortedProperties; + } else if (resolveOrder.equals("child-first")) { + String message = TaskManager.getMessage(); + if (!message.equals("Hello, World!")) { + throw new RuntimeException("Wrong message from fake TaskManager."); + } + return message + ":" + gitUrl + ":" + sortedProperties; + } else { + throw new RuntimeException("Unknown resolve order: " + resolveOrder); + } + }) + .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE); + + env.execute("ClassLoader Test Program"); + } +} diff --git a/flink-end-to-end-tests/src/main/resources/.version.properties b/flink-end-to-end-tests/src/main/resources/.version.properties new file mode 100644 index 0000000000000..dc98aeaac2851 --- /dev/null +++ b/flink-end-to-end-tests/src/main/resources/.version.properties @@ -0,0 +1 @@ +git.remote.origin.url=hello-there-42 diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index 425461cd58bb7..a30b711b8c922 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobManagerGateway; @@ -250,7 +250,7 @@ public static ClassLoader retrieveClassLoader( allURLs[pos++] = url; } - return new FlinkUserCodeClassLoader(allURLs, JobClient.class.getClassLoader()); + return FlinkUserCodeClassLoaders.parentFirst(allURLs, JobClient.class.getClassLoader()); } else { throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index c8fc4e4c3a53b..038d10fdd7389 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.net.URL; +import java.net.URLClassLoader; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -63,10 +64,16 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { /** The blob service to download libraries */ private final BlobService blobService; + /** The resolve order to use when creating a {@link ClassLoader}. */ + private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder; + // -------------------------------------------------------------------------------------------- - public BlobLibraryCacheManager(BlobService blobService) { + public BlobLibraryCacheManager( + BlobService blobService, + FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) { this.blobService = checkNotNull(blobService); + this.classLoaderResolveOrder = checkNotNull(classLoaderResolveOrder); } @Override @@ -112,7 +119,7 @@ public void registerTask( } cacheEntries.put(jobId, new LibraryCacheEntry( - requiredJarFiles, requiredClasspaths, urls, task)); + requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder)); } catch (Throwable t) { // rethrow or wrap ExceptionUtils.tryRethrowIOException(t); @@ -148,7 +155,7 @@ public void unregisterTask(JobID jobId, ExecutionAttemptID task) { // else has already been unregistered } } - + @Override public ClassLoader getClassLoader(JobID jobId) { checkNotNull(jobId, "The JobId must not be null."); @@ -204,7 +211,7 @@ public void shutdown() { */ private static class LibraryCacheEntry { - private final FlinkUserCodeClassLoader classLoader; + private final URLClassLoader classLoader; private final Set referenceHolders; /** @@ -242,9 +249,15 @@ private static class LibraryCacheEntry { Collection requiredLibraries, Collection requiredClasspaths, URL[] libraryURLs, - ExecutionAttemptID initialReference) { + ExecutionAttemptID initialReference, + FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) { + + this.classLoader = + FlinkUserCodeClassLoaders.create( + classLoaderResolveOrder, + libraryURLs, + FlinkUserCodeClassLoaders.class.getClassLoader()); - this.classLoader = new FlinkUserCodeClassLoader(libraryURLs); // NOTE: do not store the class paths, i.e. URLs, into a set for performance reasons // see http://findbugs.sourceforge.net/bugDescriptions.html#DMI_COLLECTION_OF_URLS // -> alternatively, compare their string representation diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java new file mode 100644 index 0000000000000..ef36c365a146e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution.librarycache; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; + +/** + * Gives the URLClassLoader a nicer name for debugging purposes. + */ +public class FlinkUserCodeClassLoaders { + + public static URLClassLoader parentFirst(URL[] urls) { + return new ParentFirstClassLoader(urls); + } + + public static URLClassLoader parentFirst(URL[] urls, ClassLoader parent) { + return new ParentFirstClassLoader(urls, parent); + } + + public static URLClassLoader childFirst(URL[] urls, ClassLoader parent) { + return new ChildFirstClassLoader(urls, parent); + } + + public static URLClassLoader create( + ResolveOrder resolveOrder, URL[] urls, ClassLoader parent) { + + switch (resolveOrder) { + case CHILD_FIRST: + return childFirst(urls, parent); + case PARENT_FIRST: + return parentFirst(urls, parent); + default: + throw new IllegalArgumentException("Unkown class resolution order: " + resolveOrder); + } + } + + /** + * Class resolution order for Flink URL {@link ClassLoader}. + */ + public enum ResolveOrder { + CHILD_FIRST, PARENT_FIRST; + + public static ResolveOrder fromString(String resolveOrder) { + if (resolveOrder.equalsIgnoreCase("parent-first")) { + return PARENT_FIRST; + } else if (resolveOrder.equalsIgnoreCase("child-first")) { + return CHILD_FIRST; + } else { + throw new IllegalArgumentException("Unknown resolve order: " + resolveOrder); + } + } + } + + /** + * Regular URLClassLoader that first loads from the parent and only after that form the URLs. + */ + static class ParentFirstClassLoader extends URLClassLoader { + + ParentFirstClassLoader(URL[] urls) { + this(urls, FlinkUserCodeClassLoaders.class.getClassLoader()); + } + + ParentFirstClassLoader(URL[] urls, ClassLoader parent) { + super(urls, parent); + } + } + + /** + * A variant of the URLClassLoader that first loads from the URLs and only after that from the parent. + * + *

{@link #getResourceAsStream(String)} uses {@link #getResource(String)} internally so we + * don't override that. + */ + static final class ChildFirstClassLoader extends URLClassLoader { + + public ChildFirstClassLoader(URL[] urls, ClassLoader parent) { + super(urls, parent); + } + + @Override + protected synchronized Class loadClass( + String name, boolean resolve) throws ClassNotFoundException { + + // First, check if the class has already been loaded + Class c = findLoadedClass(name); + + if (c == null) { + try { + // check the URLs + c = findClass(name); + } catch (ClassNotFoundException e) { + // let URLClassLoader do it, which will eventually call the parent + c = super.loadClass(name, resolve); + } + } + + if (resolve) { + resolveClass(c); + } + + return c; + } + + @Override + public URL getResource(String name) { + // first, try and find it via the URLClassloader + URL urlClassLoaderResource = findResource(name); + + if (urlClassLoaderResource != null) { + return urlClassLoaderResource; + } + + // delegate to super + return super.getResource(name); + } + + @Override + public Enumeration getResources(String name) throws IOException { + // first get resources from URLClassloader + Enumeration urlClassLoaderResources = findResources(name); + + final List result = new ArrayList<>(); + + while (urlClassLoaderResources.hasMoreElements()) { + result.add(urlClassLoaderResources.nextElement()); + } + + // get parent urls + Enumeration parentResources = getParent().getResources(name); + + while (parentResources.hasMoreElements()) { + result.add(parentResources.nextElement()); + } + + return new Enumeration() { + Iterator iter = result.iterator(); + + public boolean hasMoreElements() { + return iter.hasNext(); + } + + public URL nextElement() { + return iter.next(); + } + }; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java index 57aeaff5c7844..f7daabbb6a31d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java @@ -20,10 +20,12 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.Hardware; @@ -111,7 +113,11 @@ public static JobManagerServices fromConfiguration( Preconditions.checkNotNull(config); Preconditions.checkNotNull(blobServer); - final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobServer); + final String classLoaderResolveOrder = + config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER); + + final BlobLibraryCacheManager libraryCacheManager = + new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder)); final FiniteDuration timeout; try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index b6a0637b0062c..859ffbb3623b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -941,7 +941,8 @@ private JobManagerConnection associateWithJobManager( blobServerAddress, taskManagerConfiguration.getConfiguration(), haServices.createBlobStore()); - libraryCacheManager = new BlobLibraryCacheManager(blobCache); + libraryCacheManager = + new BlobLibraryCacheManager(blobCache, taskManagerConfiguration.getClassLoaderResolveOrder()); } catch (IOException e) { // Can't pass the IOException up - we need a RuntimeException anyway // two levels up where this is run asynchronously. Also, we don't diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index 7c7693bb9a1b0..60dd6431955c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -23,9 +23,11 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.util.Preconditions; @@ -58,6 +60,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { private final boolean exitJvmOnOutOfMemory; + private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder; + public TaskManagerConfiguration( int numberSlots, String[] tmpDirectories, @@ -68,7 +72,8 @@ public TaskManagerConfiguration( Time refusedRegistrationPause, long cleanupInterval, Configuration configuration, - boolean exitJvmOnOutOfMemory) { + boolean exitJvmOnOutOfMemory, + FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) { this.numberSlots = numberSlots; this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories); @@ -79,6 +84,7 @@ public TaskManagerConfiguration( this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause); this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration)); this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory; + this.classLoaderResolveOrder = classLoaderResolveOrder; } public int getNumberSlots() { @@ -120,6 +126,10 @@ public boolean shouldExitJvmOnOutOfMemoryError() { return exitJvmOnOutOfMemory; } + public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() { + return classLoaderResolveOrder; + } + // -------------------------------------------------------------------------------------------- // Static factory methods // -------------------------------------------------------------------------------------------- @@ -212,6 +222,10 @@ public static TaskManagerConfiguration fromConfiguration(Configuration configura final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY); + final String classLoaderResolveOrder = + configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER); + + return new TaskManagerConfiguration( numberSlots, tmpDirPaths, @@ -222,6 +236,7 @@ public static TaskManagerConfiguration fromConfiguration(Configuration configura refusedRegistrationPause, cleanupInterval, configuration, - exitOnOom); + exitOnOom, + FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder)); } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 276e0ffc8829a..67ffb32a201c6 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -47,6 +47,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.SuppressRestartsException +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph._ @@ -2467,6 +2468,8 @@ object JobManager { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) + val classLoaderResolveOrder = configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER) + val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration) val archiveCount = configuration.getInteger(WebOptions.ARCHIVE_COUNT) @@ -2497,7 +2500,8 @@ object JobManager { blobServer = new BlobServer(configuration, blobStore) instanceManager = new InstanceManager() scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor)) - libraryCacheManager = new BlobLibraryCacheManager(blobServer) + libraryCacheManager = + new BlobLibraryCacheManager(blobServer, ResolveOrder.fromString(classLoaderResolveOrder)) instanceManager.addInstanceListener(scheduler) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 7f4308ecf1dcf..cedf60776dd9c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.minicluster -import java.net.URL +import java.net.{URL, URLClassLoader} import java.util.UUID import java.util.concurrent.{Executors, TimeUnit} @@ -33,7 +33,7 @@ import org.apache.flink.core.fs.Path import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils} import org.apache.flink.runtime.client.{JobClient, JobExecutionException} import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors} -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} import org.apache.flink.runtime.jobgraph.JobGraph @@ -668,7 +668,7 @@ abstract class FlinkMiniCluster( private def createUserCodeClassLoader( jars: java.util.List[Path], classPaths: java.util.List[URL], - parentClassLoader: ClassLoader): FlinkUserCodeClassLoader = { + parentClassLoader: ClassLoader): URLClassLoader = { val urls = new Array[URL](jars.size() + classPaths.size()) @@ -686,6 +686,6 @@ abstract class FlinkMiniCluster( counter += 1 } - new FlinkUserCodeClassLoader(urls, parentClassLoader) + FlinkUserCodeClassLoaders.parentFirst(urls, parentClassLoader) } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 431adb6f8b122..94f375a47118c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -970,7 +970,7 @@ class TaskManager( highAvailabilityServices.createBlobStore()) blobCache = Option(blobcache) libraryCacheManager = Some( - new BlobLibraryCacheManager(blobcache)) + new BlobLibraryCacheManager(blobcache, config.getClassLoaderResolveOrder())) } catch { case e: Exception => diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index a4b48e80cdd1b..3241b863edc60 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -95,7 +95,7 @@ public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedE bc.close(); - libCache = new BlobLibraryCacheManager(cache); + libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); cache.registerJob(jobId1); cache.registerJob(jobId2); @@ -227,7 +227,7 @@ public void testLibraryCacheManagerTaskCleanup() throws IOException, Interrupted bc.close(); - libCache = new BlobLibraryCacheManager(cache); + libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); cache.registerJob(jobId); assertEquals(0, libCache.getNumberOfManagedJobs()); @@ -341,7 +341,7 @@ public void testLibraryCacheManagerMixedJobTaskCleanup() throws IOException, Int bc.close(); - libCache = new BlobLibraryCacheManager(cache); + libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); cache.registerJob(jobId); assertEquals(0, libCache.getNumberOfManagedJobs()); @@ -448,7 +448,7 @@ public void testRegisterAndDownload() throws IOException { BlobKey dataKey2 = uploader.put(jobId, new byte[]{11, 12, 13, 14, 15, 16, 17, 18}); uploader.close(); - libCache = new BlobLibraryCacheManager(cache); + libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); assertEquals(0, libCache.getNumberOfManagedJobs()); checkFileCountForJob(2, jobId, server); checkFileCountForJob(0, jobId, cache); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index e52310e6b361d..979b940f52f22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -86,7 +86,7 @@ public void testRecoveryRegisterAndDownload() throws Exception { for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config, blobStoreService); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); - libServer[i] = new BlobLibraryCacheManager(server[i]); + libServer[i] = new BlobLibraryCacheManager(server[i], FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); } // Random data diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 72f3a88117b24..d0af88d94eb7e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -195,7 +196,7 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception { instanceManager, scheduler, blobServer, - new BlobLibraryCacheManager(blobServer), + new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST), archive, new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), timeout, @@ -368,7 +369,7 @@ public void testFailingJobRecovery() throws Exception { mock(InstanceManager.class), mock(Scheduler.class), blobServer, - new BlobLibraryCacheManager(blobServer), + new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST), ActorRef.noSender(), new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), timeout, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 64cc13b624094..0f21b55b97cfe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; @@ -112,7 +112,7 @@ public void testHeartbeatTimeoutWithTaskManager() throws Exception { null, mock(OnCompletionActions.class), testingFatalErrorHandler, - new FlinkUserCodeClassLoader(new URL[0])); + FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader())); CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); @@ -213,7 +213,7 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception { null, mock(OnCompletionActions.class), testingFatalErrorHandler, - new FlinkUserCodeClassLoader(new URL[0])); + FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader())); CompletableFuture startFuture = jobMaster.start(jobMasterId, testingTimeout); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index b853b148c9e3d..5ff60229c5e15 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.instance.InstanceManager; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -189,7 +190,7 @@ private Props createJobManagerProps(Configuration configuration) throws Exceptio new InstanceManager(), new Scheduler(TestingUtils.defaultExecutionContext()), blobServer, - new BlobLibraryCacheManager(blobServer), + new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST), ActorRef.noSender(), new NoRestartStrategy.NoRestartStrategyFactory(), AkkaUtils.getDefaultTimeoutAsFiniteDuration(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 410b93e7c7bd8..052699abbbc05 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -121,7 +122,8 @@ public void testComponentsStartupShutdown() throws Exception { Time.seconds(10), 1000000, // cleanup interval config, - false); // exit-jvm-on-fatal-error + false, // exit-jvm-on-fatal-error + FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); final int networkBufNum = 32; // note: the network buffer memory configured here is not actually used below but set diff --git a/pom.xml b/pom.xml index 229e93d6f12a2..567f8f1b7c2f4 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ under the License. flink-examples flink-clients flink-tests + flink-end-to-end-tests flink-test-utils-parent flink-libraries flink-scala-shell diff --git a/test-infra/end-to-end-test/test_streaming_classloader.sh b/test-infra/end-to-end-test/test_streaming_classloader.sh new file mode 100755 index 0000000000000..efbf98e1c312c --- /dev/null +++ b/test-infra/end-to-end-test/test_streaming_classloader.sh @@ -0,0 +1,99 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +set -e +set -o pipefail + +# Convert relative path to absolute path +TEST_ROOT=`pwd` +TEST_INFRA_DIR="$0" +TEST_INFRA_DIR=`dirname "$TEST_INFRA_DIR"` +cd $TEST_INFRA_DIR +TEST_INFRA_DIR=`pwd` +cd $TEST_ROOT + +. "$TEST_INFRA_DIR"/common.sh + +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/target/ClassLoaderTestProgram.jar + +# kill any remaining JobManagers/TaskManagers at the end +trap 'pkill -f "JobManager|TaskManager"' EXIT + +echo "Testing parent-first class loading" + +# retrieve git.remote.origin.url from .version.properties +GIT_REMOTE_URL=`grep "git\.remote\.origin\.url" $TEST_INFRA_DIR/../../flink-runtime/src/main/resources/.version.properties \ + |cut -d'=' -f2 \ + |sed -e 's/\\\:/:/g'` + +# remove any leftover classloader settings +sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml" +echo "classloader.resolve-order: parent-first" >> "$FLINK_DIR/conf/flink-conf.yaml" + +start_cluster + +$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --resolve-order parent-first --output $TEST_DATA_DIR/out/cl_out_pf + +stop_cluster + +# remove classloader settings again +sed -i -e 's/classloader.resolve-order: .*//' $FLINK_DIR/conf/flink-conf.yaml + +OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_pf` +# first field: whether we found the method on TaskManager +# result of getResource(".version.properties"), should be from the parent +# ordered result of getResources(".version.properties"), should have parent first +EXPECTED="NoSuchMethodError:${GIT_REMOTE_URL}:${GIT_REMOTE_URL}hello-there-42" +if [[ "$OUTPUT" != "$EXPECTED" ]]; then + echo "Output from Flink program does not match expected output." + echo -e "EXPECTED: $EXPECTED" + echo -e "ACTUAL: $OUTPUT" + PASS="" +fi + +echo "Testing child-first class loading" + +# remove any leftover classloader settings +sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml" +echo "classloader.resolve-order: child-first" >> "$FLINK_DIR/conf/flink-conf.yaml" + +start_cluster + +$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --resolve-order child-first --output $TEST_DATA_DIR/out/cl_out_cf + +stop_cluster + +# remove classloader settings again +sed -i -e 's/classloader.resolve-order: .*//' $FLINK_DIR/conf/flink-conf.yaml + +OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_cf` +# first field: whether we found the method on TaskManager +# result of getResource(".version.properties"), should be from the child +# ordered result of getResources(".version.properties"), should be child first +EXPECTED="Hello, World!:hello-there-42:hello-there-42${GIT_REMOTE_URL}" +if [[ "$OUTPUT" != "$EXPECTED" ]]; then + echo "Output from Flink program does not match expected output." + echo -e "EXPECTED: $EXPECTED" + echo -e "ACTUAL: $OUTPUT" + PASS="" +fi + +clean_data_dir +check_all_pass diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh index 5232c649c844a..77bad24c4c592 100755 --- a/tools/travis_mvn_watchdog.sh +++ b/tools/travis_mvn_watchdog.sh @@ -423,6 +423,12 @@ case $TEST in printf "==============================================================================\n" test-infra/end-to-end-test/test_streaming_kafka010.sh build-target cluster EXIT_CODE=$(($EXIT_CODE+$?)) + + printf "\n==============================================================================\n" + printf "Running class loading end-to-end test\n" + printf "==============================================================================\n" + test-infra/end-to-end-test/test_streaming_classloader.sh build-target cluster + EXIT_CODE=$(($EXIT_CODE+$?)) else printf "\n==============================================================================\n" printf "Previous build failure detected, skipping end-to-end tests.\n"