Skip to content

Commit

Permalink
[FLINK-16245][runtime] Decouple user from context classloader
Browse files Browse the repository at this point in the history
Allows user classloader can be unloaded even if a reference on the context classloader outlives the user code.
  • Loading branch information
Arvid Heise authored and zentol committed Aug 16, 2020
1 parent ecc3429 commit eed1b58
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected FlinkUserCodeClassLoader(
}

@Override
protected final Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
public final Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
try {
synchronized (getClassLoadingLock(name)) {
return loadClassWithoutExceptionHandling(name, resolve);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@
import org.apache.flink.util.ChildFirstClassLoader;
import org.apache.flink.util.FlinkUserCodeClassLoader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Enumeration;
import java.util.function.Consumer;

/**
Expand All @@ -38,15 +44,19 @@ public static URLClassLoader parentFirst(
URL[] urls,
ClassLoader parent,
Consumer<Throwable> classLoadingExceptionHandler) {
return new ParentFirstClassLoader(urls, parent, classLoadingExceptionHandler);
return new SafetyNetWrapperClassLoader(
new ParentFirstClassLoader(urls, parent, classLoadingExceptionHandler),
parent);
}

public static URLClassLoader childFirst(
URL[] urls,
ClassLoader parent,
String[] alwaysParentFirstPatterns,
Consumer<Throwable> classLoadingExceptionHandler) {
return new ChildFirstClassLoader(urls, parent, alwaysParentFirstPatterns, classLoadingExceptionHandler);
return new SafetyNetWrapperClassLoader(
new ChildFirstClassLoader(urls, parent, alwaysParentFirstPatterns, classLoadingExceptionHandler),
parent);
}

public static URLClassLoader create(
Expand Down Expand Up @@ -96,4 +106,68 @@ static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {
ClassLoader.registerAsParallelCapable();
}
}

/**
* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent
* the user classloader to be garbage collected (FLINK-16245).
*
* <p>This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled
* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap
* classloader and most likely result in ClassNotFound exceptions.
*/
private static class SafetyNetWrapperClassLoader extends URLClassLoader implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class);

private volatile FlinkUserCodeClassLoader inner;

SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner, ClassLoader parent) {
super(new URL[0], parent);
this.inner = inner;
}

@Override
public void close() {
final FlinkUserCodeClassLoader inner = this.inner;
if (inner != null) {
try {
inner.close();
} catch (IOException e) {
LOG.warn("Could not close user classloader", e);
}
}
this.inner = null;
}

private FlinkUserCodeClassLoader ensureInner() {
if (inner == null) {
throw new IllegalStateException("Trying to access closed classloader");
}
return inner;
}

@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
return ensureInner().loadClass(name);
}

@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
// called for dynamic class loading
return ensureInner().loadClass(name, resolve);
}

@Override
public URL getResource(String name) {
return ensureInner().getResource(name);
}

@Override
public Enumeration<URL> getResources(String name) throws IOException {
return ensureInner().getResources(name);
}

static {
ClassLoader.registerAsParallelCapable();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.hamcrest.Matchers.hasProperty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotSame;

/**
* Tests for classloading and class loader utilities.
Expand Down Expand Up @@ -206,4 +207,28 @@ private static URLClassLoader createChildFirstClassLoader(URL childCodePath, Cla
new String[0],
NOOP_EXCEPTION_HANDLER);
}

@Test
public void testClosingOfClassloader() throws Exception {
final String className = ClassToLoad.class.getName();

final ClassLoader parentClassLoader = ClassLoader.getSystemClassLoader().getParent();

final URL childCodePath = getClass().getProtectionDomain().getCodeSource().getLocation();

final URLClassLoader childClassLoader = createChildFirstClassLoader(childCodePath, parentClassLoader);

final Class<?> loadedClass = childClassLoader.loadClass(className);

assertNotSame(ClassToLoad.class, loadedClass);

childClassLoader.close();

// after closing, no loaded class should be reachable anymore
expectedException.expect(isA(IllegalStateException.class));
childClassLoader.loadClass(className);
}

private static class ClassToLoad {
}
}

0 comments on commit eed1b58

Please sign in to comment.