Skip to content

Commit

Permalink
[FLINK-17554] Allow to register release hooks for the classloader
Browse files Browse the repository at this point in the history
This closes apache#13355
  • Loading branch information
rmetzger committed Sep 16, 2020
1 parent 5ed614c commit 328c901
Show file tree
Hide file tree
Showing 44 changed files with 434 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ public interface RuntimeContext {
*/
ClassLoader getUserCodeClassLoader();

/**
* Registers a custom hook for the user code class loader release.
*
* <p>The release hook is executed just before the user code class loader is being released.
* Registration only happens if no hook has been registered under this name already.
*
* @param releaseHookName name of the release hook.
* @param releaseHook release hook which is executed just before the user code class loader is being released
*/
@PublicEvolving
void registerUserCodeClassLoaderReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook);

// --------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.UserCodeClassLoader;

import java.io.Serializable;
import java.util.Map;
Expand All @@ -58,7 +59,7 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {

private final TaskInfo taskInfo;

private final ClassLoader userCodeClassLoader;
private final UserCodeClassLoader userCodeClassLoader;

private final ExecutionConfig executionConfig;

Expand All @@ -69,7 +70,7 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
private final MetricGroup metrics;

public AbstractRuntimeUDFContext(TaskInfo taskInfo,
ClassLoader userCodeClassLoader,
UserCodeClassLoader userCodeClassLoader,
ExecutionConfig executionConfig,
Map<String, Accumulator<?, ?>> accumulators,
Map<String, Future<Path>> cpTasks,
Expand Down Expand Up @@ -159,7 +160,12 @@ public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name)

@Override
public ClassLoader getUserCodeClassLoader() {
return this.userCodeClassLoader;
return this.userCodeClassLoader.asClassLoader();
}

@Override
public void registerUserCodeClassLoaderReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook) {
userCodeClassLoader.registerReleaseHookIfAbsent(releaseHookName, releaseHook);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.SimpleUserCodeClassLoader;

import java.util.HashMap;
import java.util.List;
Expand All @@ -47,7 +48,7 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
public RuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulators,
MetricGroup metrics) {
super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks, metrics);
super(taskInfo, SimpleUserCodeClassLoader.create(userCodeClassLoader), executionConfig, accumulators, cpTasks, metrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

/**
* Simple {@link UserCodeClassLoader} implementation which assumes that the provided class
* loader will never be released and, hence, will never execute the release hooks.
*/
public class SimpleUserCodeClassLoader implements UserCodeClassLoader {

private final ClassLoader classLoader;

private SimpleUserCodeClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}

@Override
public ClassLoader asClassLoader() {
return classLoader;
}

@Override
public void registerReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook) {}

public static SimpleUserCodeClassLoader create(ClassLoader classLoader) {
return new SimpleUserCodeClassLoader(classLoader);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

/**
* UserCodeClassLoader allows to register release hooks for a user code class loader.
*
* <p>These release hooks are being executed just before the user code class loader is
* being released.
*/
public interface UserCodeClassLoader {

/**
* Obtains the actual class loader.
*
* @return actual class loader
*/
ClassLoader asClassLoader();

/**
* Registers a release hook which is being executed before the user code class
* loader is being released.
*
* @param releaseHookName
* @param releaseHook releaseHook which is executed before the user code class loader is being released.
*/
void registerReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook);
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public ClassLoader getUserCodeClassLoader() {
return runtimeContext.getUserCodeClassLoader();
}

@Override
public void registerUserCodeClassLoaderReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook) {
runtimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent(releaseHookName, releaseHook);
}

@Override
public DistributedCache getDistributedCache() {
return runtimeContext.getDistributedCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.UserCodeClassLoader;

import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -95,6 +96,8 @@ public class SavepointEnvironment implements Environment {

private final AccumulatorRegistry accumulatorRegistry;

private final UserCodeClassLoader userCodeClassLoader;

private SavepointEnvironment(RuntimeContext ctx, Configuration configuration, int maxParallelism, int indexOfSubtask, PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskState) {
this.jobID = new JobID();
this.vertexID = new JobVertexID();
Expand All @@ -111,6 +114,8 @@ private SavepointEnvironment(RuntimeContext ctx, Configuration configuration, in
this.ioManager = new IOManagerAsync(ConfigurationUtils.parseTempDirectories(configuration));
this.memoryManager = MemoryManager.create(64 * 1024 * 1024, DEFAULT_PAGE_SIZE);
this.accumulatorRegistry = new AccumulatorRegistry(jobID, attemptID);

this.userCodeClassLoader = UserCodeClassLoaderRuntimeContextAdapter.from(ctx);
}

@Override
Expand Down Expand Up @@ -179,8 +184,8 @@ public MemoryManager getMemoryManager() {
}

@Override
public ClassLoader getUserClassLoader() {
return ctx.getUserCodeClassLoader();
public UserCodeClassLoader getUserCodeClassLoader() {
return userCodeClassLoader;
}

@Override
Expand Down Expand Up @@ -320,6 +325,29 @@ public SavepointEnvironment build() {
}
}

private static final class UserCodeClassLoaderRuntimeContextAdapter implements UserCodeClassLoader {

private final RuntimeContext runtimeContext;

private UserCodeClassLoaderRuntimeContextAdapter(RuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
}

@Override
public ClassLoader asClassLoader() {
return runtimeContext.getUserCodeClassLoader();
}

@Override
public void registerReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook) {
runtimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent(releaseHookName, releaseHook);
}

private static UserCodeClassLoaderRuntimeContextAdapter from(RuntimeContext runtimeContext) {
return new UserCodeClassLoaderRuntimeContextAdapter(runtimeContext);
}
}

// ------------------------------------------------------------------------
// mocks / stand-ins
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public ClassLoader getUserCodeClassLoader() {
return ctx.getUserCodeClassLoader();
}

@Override
public void registerUserCodeClassLoaderReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook) {
ctx.registerUserCodeClassLoaderReleaseHookIfAbsent(releaseHookName, releaseHook);
}

@Override
public <V, A extends Serializable> void addAccumulator(
String name, Accumulator<V, A> accumulator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.UserCodeClassLoader;

import java.util.Map;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -148,7 +149,7 @@ public interface Environment {
/**
* Returns the user code class loader
*/
ClassLoader getUserClassLoader();
UserCodeClassLoader getUserCodeClassLoader();

Map<String, Future<Path>> getDistributedCacheEntries();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkUserCodeClassLoader;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -220,7 +221,7 @@ private LibraryCacheEntry(JobID jobId) {
this.isReleased = false;
}

private ClassLoader getOrResolveClassLoader(Collection<PermanentBlobKey> libraries, Collection<URL> classPaths) throws IOException {
private UserCodeClassLoader getOrResolveClassLoader(Collection<PermanentBlobKey> libraries, Collection<URL> classPaths) throws IOException {
synchronized (lockObject) {
verifyIsNotReleased();

Expand All @@ -230,7 +231,7 @@ private ClassLoader getOrResolveClassLoader(Collection<PermanentBlobKey> librari
resolvedClassLoader.verifyClassLoader(libraries, classPaths);
}

return resolvedClassLoader.getClassLoader();
return resolvedClassLoader;
}
}

Expand Down Expand Up @@ -317,7 +318,7 @@ private DefaultClassLoaderLease(LibraryCacheEntry libraryCacheEntry) {
}

@Override
public ClassLoader getOrResolveClassLoader(Collection<PermanentBlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) throws IOException {
public UserCodeClassLoader getOrResolveClassLoader(Collection<PermanentBlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) throws IOException {
verifyIsNotClosed();
return libraryCacheEntry.getOrResolveClassLoader(
requiredJarFiles,
Expand All @@ -344,7 +345,7 @@ private static DefaultClassLoaderLease create(LibraryCacheEntry libraryCacheEntr
}
}

private static final class ResolvedClassLoader {
private static final class ResolvedClassLoader implements UserCodeClassLoader {
private final URLClassLoader classLoader;

/**
Expand All @@ -363,6 +364,8 @@ private static final class ResolvedClassLoader {
*/
private final Set<String> classPaths;

private final Map<String, Runnable> releaseHooks;

private ResolvedClassLoader(URLClassLoader classLoader, Collection<PermanentBlobKey> requiredLibraries, Collection<URL> requiredClassPaths) {
this.classLoader = classLoader;

Expand All @@ -374,12 +377,20 @@ private ResolvedClassLoader(URLClassLoader classLoader, Collection<PermanentBlob
classPaths.add(url.toString());
}
this.libraries = new HashSet<>(requiredLibraries);

this.releaseHooks = new HashMap<>();
}

private URLClassLoader getClassLoader() {
@Override
public ClassLoader asClassLoader() {
return classLoader;
}

@Override
public void registerReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook) {
releaseHooks.putIfAbsent(releaseHookName, releaseHook);
}

private void verifyClassLoader(Collection<PermanentBlobKey> requiredLibraries, Collection<URL> requiredClassPaths) {
// Make sure the previous registration referred to the same libraries and class paths.
// NOTE: the original collections may contain duplicates and may not already be Set
Expand Down Expand Up @@ -413,11 +424,29 @@ private void verifyClassLoader(Collection<PermanentBlobKey> requiredLibraries, C
* and the cached libraries are deleted immediately.
*/
private void releaseClassLoader() {
runReleaseHooks();

try {
classLoader.close();
} catch (IOException e) {
LOG.warn("Failed to release user code class loader for " + Arrays.toString(libraries.toArray()));
}
}

private void runReleaseHooks() {
Set<Map.Entry<String, Runnable>> hooks = releaseHooks.entrySet();
if (!hooks.isEmpty()) {
for (Map.Entry<String, Runnable> hookEntry : hooks) {
try {
LOG.debug("Running class loader shutdown hook: {}.", hookEntry.getKey());
hookEntry.getValue().run();
} catch (Throwable t) {
LOG.warn("Failed to run release hook '{}' for user code class loader.", hookEntry.getValue(), t);
}
}

releaseHooks.clear();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.util.UserCodeClassLoader;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -77,7 +78,7 @@ interface ClassLoaderHandle {
* @throws IOException if the required jar files cannot be downloaded
* @throws IllegalStateException if the cached user code class loader does not fulfill the requirements
*/
ClassLoader getOrResolveClassLoader(Collection<PermanentBlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) throws IOException;
UserCodeClassLoader getOrResolveClassLoader(Collection<PermanentBlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) throws IOException;
}

/**
Expand Down
Loading

0 comments on commit 328c901

Please sign in to comment.