Skip to content

Commit

Permalink
[FLINK-7442] Add option for using a child-first classloader for loadi…
Browse files Browse the repository at this point in the history
…ng user code

This also adds an end-to-end test that verifies correct order for both
classes and resources.
  • Loading branch information
aljoscha committed Sep 21, 2017
1 parent 2a4ac66 commit a86b646
Show file tree
Hide file tree
Showing 26 changed files with 584 additions and 76 deletions.
5 changes: 4 additions & 1 deletion docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to

- `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) configuration **directory** (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (`hdfs:https:///path/to/files`, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like `hdfs:https://address:port/path/to/files`. This option also causes file writers to pick up the HDFS's default values for block sizes and replication factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in the specified directory.

- `classloader.resolve-order`: Whether Flink should use a child-first `ClassLoader` when loading
user-code classes or a parent-first `ClassLoader`. Can be one of `parent-first` or `child-first`. (default: `child-first`)

## Advanced Options

### Compute
Expand Down Expand Up @@ -186,7 +189,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.

- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file:https://` only for local setups.

- `state.backend.rocksdb.checkpointdir`: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ‘:’ (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`)
- `state.backend.rocksdb.checkpointdir`: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ':' (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`)

- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/ops/state/checkpoints.html#externalized-checkpoints).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.client.program;

import org.apache.flink.api.common.Plan;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -133,6 +133,6 @@ public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> cla
for (int i = 0; i < classpaths.size(); i++) {
urls[i + jars.size()] = classpaths.get(i);
}
return new FlinkUserCodeClassLoader(urls, parent);
return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.RocksDB;

import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;

import static org.junit.Assert.assertNotEquals;

Expand All @@ -46,8 +47,8 @@ public void testTwoSeparateClassLoaders() throws Exception {
final URL codePath2 = RocksDB.class.getProtectionDomain().getCodeSource().getLocation();

final ClassLoader parent = getClass().getClassLoader();
final ClassLoader loader1 = new ChildFirstClassLoader(new URL[] { codePath1, codePath2 }, parent);
final ClassLoader loader2 = new ChildFirstClassLoader(new URL[] { codePath1, codePath2 }, parent);
final ClassLoader loader1 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent);
final ClassLoader loader2 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent);

final String className = RocksDBStateBackend.class.getName();

Expand All @@ -69,32 +70,4 @@ public void testTwoSeparateClassLoaders() throws Exception {
meth1.invoke(instance1, tempDir);
meth2.invoke(instance2, tempDir);
}

// ------------------------------------------------------------------------

/**
* A variant of the URLClassLoader that first loads from the URLs and only after that from the parent.
*/
private static final class ChildFirstClassLoader extends URLClassLoader {

private final ClassLoader parent;

public ChildFirstClassLoader(URL[] urls, ClassLoader parent) {
super(urls, null);
this.parent = parent;
}

@Override
public Class<?> findClass(String name) throws ClassNotFoundException {
// first try to load from the URLs
// because the URLClassLoader's parent is null, this cannot implicitly load from the parent
try {
return super.findClass(name);
}
catch (ClassNotFoundException e) {
// not in the URL, check the parent
return parent.loadClass(name);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ public class CoreOptions {
// process parameters
// ------------------------------------------------------------------------

public static final ConfigOption<String> CLASSLOADER_RESOLVE_ORDER = ConfigOptions
.key("classloader.resolve-order")
.defaultValue("child-first");

// ------------------------------------------------------------------------
// process parameters
// ------------------------------------------------------------------------

public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions
.key("env.java.opts")
.defaultValue("");
Expand Down
107 changes: 107 additions & 0 deletions flink-end-to-end-tests/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http:https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http:https://maven.apache.org/POM/4.0.0" xmlns:xsi="http:https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http:https://maven.apache.org/POM/4.0.0 http:https://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>1.4-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>flink-end-to-end-tests_${scala.binary.version}</artifactId>
<name>flink-end-to-end-tests</name>

<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>

<executions>
<!-- ClassLoaderTestProgram -->
<execution>
<id>ClassLoaderTestProgram</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>ClassLoaderTestProgram</classifier>

<archive>
<manifestEntries>
<program-class>org.apache.flink.streaming.tests.ClassLoaderTestProgram</program-class>
</manifestEntries>
</archive>

<includes>
<include>org/apache/flink/streaming/tests/ClassLoaderTestProgram.class</include>
<include>org/apache/flink/runtime/taskmanager/TaskManager.class</include>
<include>.version.properties</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>

<!--simplify the name of the testing JARs for referring to them in the end-to-end test scripts-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>rename</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<copy file="${project.basedir}/target/flink-end-to-end-tests_${scala.binary.version}-${project.version}-ClassLoaderTestProgram.jar" tofile="${project.basedir}/target/ClassLoaderTestProgram.jar" />
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.execution.librarycache;

import java.net.URL;
import java.net.URLClassLoader;
package org.apache.flink.runtime.taskmanager;

/**
* Gives the URLClassLoader a nicer name for debugging purposes.
* A {@code Taskmanager} in the same package as the proper Flink {@link TaskManager}. We use this
* to check whether Flink correctly uses the child-first {@link ClassLoader} when configured to do
* so.
*/
public class FlinkUserCodeClassLoader extends URLClassLoader {

public FlinkUserCodeClassLoader(URL[] urls) {
this(urls, FlinkUserCodeClassLoader.class.getClassLoader());
}

public FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
public class TaskManager {
public static String getMessage() {
return "Hello, World!";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.tests;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.InputStream;
import java.net.URL;
import java.util.Enumeration;
import java.util.Properties;

/**
* End-to-end test program for verifying that the {@code classloader.resolve-order} setting
* is being honored by Flink. We test this by creating a fake {@code TaskManager} with a single
* method that we call in the same package as the original Flink {@code TaskManager} and verify that
* we get a {@link NoSuchMethodError} if we're running with {@code parent-first} class loading
* and that we get the correct result from the method when we're running with {@code child-first}
* class loading.
*/
public class ClassLoaderTestProgram {

public static void main(String[] args) throws Exception {

final ParameterTool params = ParameterTool.fromArgs(args);

final String resolveOrder = params.getRequired("resolve-order");

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env
.fromElements("Hello")
.map((MapFunction<String, String>) value -> {

String gitUrl;

try (InputStream propFile = ClassLoaderTestProgram.class.getClassLoader().getResourceAsStream(".version.properties")) {
Properties properties = new Properties();
properties.load(propFile);
gitUrl = properties.getProperty("git.remote.origin.url");
}

Enumeration<URL> resources = ClassLoaderTestProgram.class.getClassLoader().getResources(
".version.properties");

StringBuilder sortedProperties = new StringBuilder();
while (resources.hasMoreElements()) {
URL url = resources.nextElement();
try (InputStream in = url.openStream()) {
Properties properties = new Properties();
properties.load(in);
String orderedGitUrl = properties.getProperty("git.remote.origin.url");
sortedProperties.append(orderedGitUrl);
}
}

if (resolveOrder.equals("parent-first")) {
try {
@SuppressWarnings("unused")
String ignored = TaskManager.getMessage();

throw new RuntimeException(
"TaskManager.getMessage() should not be available with parent-first " +
"ClassLoader order.");

} catch (NoSuchMethodError e) {
// expected
}
return "NoSuchMethodError:" + gitUrl + ":" + sortedProperties;
} else if (resolveOrder.equals("child-first")) {
String message = TaskManager.getMessage();
if (!message.equals("Hello, World!")) {
throw new RuntimeException("Wrong message from fake TaskManager.");
}
return message + ":" + gitUrl + ":" + sortedProperties;
} else {
throw new RuntimeException("Unknown resolve order: " + resolveOrder);
}
})
.writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);

env.execute("ClassLoader Test Program");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
git.remote.origin.url=hello-there-42
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
Expand Down Expand Up @@ -250,7 +250,7 @@ public static ClassLoader retrieveClassLoader(
allURLs[pos++] = url;
}

return new FlinkUserCodeClassLoader(allURLs, JobClient.class.getClassLoader());
return FlinkUserCodeClassLoaders.parentFirst(allURLs, JobClient.class.getClassLoader());
} else {
throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
}
Expand Down
Loading

0 comments on commit a86b646

Please sign in to comment.