Skip to content

Commit

Permalink
[FLINK-11952][2/4] Introduce basic plugin mechanism for Flink
Browse files Browse the repository at this point in the history
The mechanism uses child-first classloading and creates classloaders from jars that are discovered
from a directory hierarchy.
  • Loading branch information
StefanRRichter committed Apr 15, 2019
1 parent 1a10fbe commit 0c4953c
Show file tree
Hide file tree
Showing 31 changed files with 1,286 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.plugin.Plugin;

import java.io.IOException;
import java.net.URI;
Expand All @@ -31,22 +32,13 @@
* creating file systems via {@link #create(URI)}.
*/
@PublicEvolving
public interface FileSystemFactory {
public interface FileSystemFactory extends Plugin {

/**
* Gets the scheme of the file system created by this factory.
*/
String getScheme();

/**
* Applies the given configuration to this factory. All future file system
* instantiations via {@link #create(URI)} should take the configuration into
* account.
*
* @param config The configuration to apply.
*/
void configure(Configuration config);

/**
* Creates a new file system for the given file system URI.
* The URI describes the type of file system (via its scheme) and optionally the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.core.fs;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -53,11 +52,6 @@ public String getScheme() {
return "n/a";
}

@Override
public void configure(Configuration config) {
// nothing to do here
}

@Override
public FileSystem create(URI fsUri) throws IOException {
if (exceptionCause == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.core.fs.local;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;

Expand All @@ -36,11 +35,6 @@ public String getScheme() {
return LocalFileSystem.getLocalFsURI().getScheme();
}

@Override
public void configure(Configuration config) {
// the local file system takes no configuration, so nothing to do here
}

@Override
public FileSystem create(URI fsUri) {
return LocalFileSystem.getSharedInstance();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.plugin;

import org.apache.flink.util.function.FunctionUtils;

import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.stream.Collectors;

/**
* This class is used to create a collection of {@link PluginDescriptor} based on directory structure for a given plugin
* root folder.
*
* <p>The expected structure is as follows: the given plugins root folder, containing the plugins folder. One plugin folder
* contains all resources (jar files) belonging to a plugin. The name of the plugin folder becomes the plugin id.
* <pre>
* plugins-root-folder/
* |------------plugin-a/ (folder of plugin a)
* | |-plugin-a-1.jar (the jars containing the classes of plugin a)
* | |-plugin-a-2.jar
* | |-...
* |
* |------------plugin-b/
* | |-plugin-b-1.jar
* ... |-...
* </pre>
*/
public class DirectoryBasedPluginFinder implements PluginFinder {

/** Pattern to match jar files in a directory. */
private static final String JAR_MATCHER_PATTERN = "glob:**.jar";

/** Root directory to the plugin folders. */
private final Path pluginsRootDir;

/** Matcher for jar files in the filesystem of the root folder. */
private final PathMatcher jarFileMatcher;

public DirectoryBasedPluginFinder(Path pluginsRootDir) {
this.pluginsRootDir = pluginsRootDir;
this.jarFileMatcher = pluginsRootDir.getFileSystem().getPathMatcher(JAR_MATCHER_PATTERN);
}

@Override
public Collection<PluginDescriptor> findPlugins() throws IOException {

if (!Files.isDirectory(pluginsRootDir)) {
throw new IOException("Plugins root directory [" + pluginsRootDir + "] does not exist!");
}

return Files.list(pluginsRootDir)
.filter((Path path) -> Files.isDirectory(path))
.map(FunctionUtils.uncheckedFunction(this::createPluginDescriptorForSubDirectory))
.collect(Collectors.toList());
}

private PluginDescriptor createPluginDescriptorForSubDirectory(Path subDirectory) throws IOException {
URL[] urls = createJarURLsFromDirectory(subDirectory);
Arrays.sort(urls, Comparator.comparing(URL::toString));
//TODO: This class could be extended to parse exclude-pattern from a optional text files in the plugin directories.
return new PluginDescriptor(
subDirectory.getFileName().toString(),
urls,
new String[0]);
}

private URL[] createJarURLsFromDirectory(Path subDirectory) throws IOException {
URL[] urls = Files.list(subDirectory)
.filter((Path p) -> Files.isRegularFile(p) && jarFileMatcher.matches(p))
.map(FunctionUtils.uncheckedFunction((Path p) -> p.toUri().toURL()))
.toArray(URL[]::new);

if (urls.length < 1) {
throw new IOException("Cannot find any jar files for plugin in directory [" + subDirectory + "]." +
" Please provide the jar files for the plugin or delete the directory.");
}

return urls;
}
}
37 changes: 37 additions & 0 deletions flink-core/src/main/java/org/apache/flink/core/plugin/Plugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.plugin;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;

/**
* Interface for plugins. Plugins typically extend this interface in their SPI and the concrete implementations of a
* service then implement the SPI contract.
*/
@PublicEvolving
public interface Plugin {

/**
* Optional method for plugins to pick up settings from the configuration.
*
* @param config The configuration to apply to the plugin.
*/
default void configure(Configuration config) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.plugin;

import java.net.URL;
import java.util.Arrays;

/**
* Descriptive meta information for a plugin.
*/
public class PluginDescriptor {

/** Unique identifier of the plugin. */
private final String pluginId;

/** URLs to the plugin resources code. Usually this contains URLs of the jars that will be loaded for the plugin. */
private final URL[] pluginResourceURLs;

/**
* String patterns of classes that should be excluded from loading out of the plugin resources. See
* {@link org.apache.flink.util.ChildFirstClassLoader}'s field alwaysParentFirstPatterns.
*/
private final String[] loaderExcludePatterns;

public PluginDescriptor(String pluginId, URL[] pluginResourceURLs, String[] loaderExcludePatterns) {
this.pluginId = pluginId;
this.pluginResourceURLs = pluginResourceURLs;
this.loaderExcludePatterns = loaderExcludePatterns;
}

public String getPluginId() {
return pluginId;
}

public URL[] getPluginResourceURLs() {
return pluginResourceURLs;
}

public String[] getLoaderExcludePatterns() {
return loaderExcludePatterns;
}

@Override
public String toString() {
return "PluginDescriptor{" +
"pluginId='" + pluginId + '\'' +
", pluginResourceURLs=" + Arrays.toString(pluginResourceURLs) +
", loaderExcludePatterns=" + Arrays.toString(loaderExcludePatterns) +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.plugin;

import java.io.IOException;
import java.util.Collection;

/**
* Implementations of this interface provide mechanisms to locate plugins and create corresponding
* {@link PluginDescriptor} objects. The result can then be used to initialize a {@link PluginLoader}.
*/
public interface PluginFinder {

/**
* Find plugins and return a corresponding collection of {@link PluginDescriptor} instances.
*
* @return a collection of {@link PluginDescriptor} instances for all found plugins.
* @throws IOException thrown if a problem occurs during plugin search.
*/
Collection<PluginDescriptor> findPlugins() throws IOException;
}
Loading

0 comments on commit 0c4953c

Please sign in to comment.