Skip to content

Commit

Permalink
[hotfix] Extract PluginManager interface
Browse files Browse the repository at this point in the history
  • Loading branch information
KarmaGYZ authored and tillrohrmann committed May 17, 2020
1 parent 6075ed7 commit 6b5c173
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.plugin;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;

import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;

import javax.annotation.concurrent.ThreadSafe;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;

/**
* Default implementation of {@link PluginManager}.
*/
@Internal
@ThreadSafe
public class DefaultPluginManager implements PluginManager {

/** Parent-classloader to all classloader that are used for plugin loading. We expect that this is thread-safe. */
private final ClassLoader parentClassLoader;

/** A collection of descriptions of all plugins known to this plugin manager. */
private final Collection<PluginDescriptor> pluginDescriptors;

/** List of patterns for classes that should always be resolved from the parent ClassLoader. */
private final String[] alwaysParentFirstPatterns;

@VisibleForTesting
DefaultPluginManager() {
parentClassLoader = null;
pluginDescriptors = null;
alwaysParentFirstPatterns = null;
}

public DefaultPluginManager(Collection<PluginDescriptor> pluginDescriptors, String[] alwaysParentFirstPatterns) {
this(pluginDescriptors, DefaultPluginManager.class.getClassLoader(), alwaysParentFirstPatterns);
}

public DefaultPluginManager(Collection<PluginDescriptor> pluginDescriptors, ClassLoader parentClassLoader, String[] alwaysParentFirstPatterns) {
this.pluginDescriptors = pluginDescriptors;
this.parentClassLoader = parentClassLoader;
this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
}

@Override
public <P> Iterator<P> load(Class<P> service) {
ArrayList<Iterator<P>> combinedIterators = new ArrayList<>(pluginDescriptors.size());
for (PluginDescriptor pluginDescriptor : pluginDescriptors) {
PluginLoader pluginLoader = PluginLoader.create(pluginDescriptor, parentClassLoader, alwaysParentFirstPatterns);
combinedIterators.add(pluginLoader.load(service));
}
return Iterators.concat(combinedIterators.iterator());
}

@Override
public String toString() {
return "PluginManager{" +
"parentClassLoader=" + parentClassLoader +
", pluginDescriptors=" + pluginDescriptors +
", alwaysParentFirstPatterns=" + Arrays.toString(alwaysParentFirstPatterns) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,13 @@

package org.apache.flink.core.plugin;

import org.apache.flink.annotation.Internal;

import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;

import javax.annotation.concurrent.ThreadSafe;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;

/**
* Manager class and entry-point for the plugin mechanism in Flink.
* PluginManager is responsible for managing cluster plugins which are loaded using separate class loaders so that their dependencies
* don't interfere with Flink's dependencies.
*/
@Internal
@ThreadSafe
public class PluginManager {

/** Parent-classloader to all classloader that are used for plugin loading. We expect that this is thread-safe. */
private final ClassLoader parentClassLoader;

/** A collection of descriptions of all plugins known to this plugin manager. */
private final Collection<PluginDescriptor> pluginDescriptors;

/** List of patterns for classes that should always be resolved from the parent ClassLoader. */
private final String[] alwaysParentFirstPatterns;

public PluginManager(Collection<PluginDescriptor> pluginDescriptors, String[] alwaysParentFirstPatterns) {
this(pluginDescriptors, PluginManager.class.getClassLoader(), alwaysParentFirstPatterns);
}

public PluginManager(Collection<PluginDescriptor> pluginDescriptors, ClassLoader parentClassLoader, String[] alwaysParentFirstPatterns) {
this.pluginDescriptors = pluginDescriptors;
this.parentClassLoader = parentClassLoader;
this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
}
public interface PluginManager {

/**
* Returns in iterator over all available implementations of the given service interface (SPI) in all the plugins
Expand All @@ -63,21 +34,5 @@ public PluginManager(Collection<PluginDescriptor> pluginDescriptors, ClassLoader
* @param <P> Type of the requested plugin service.
* @return Iterator over all implementations of the given service that could be loaded from all known plugins.
*/
public <P> Iterator<P> load(Class<P> service) {
ArrayList<Iterator<P>> combinedIterators = new ArrayList<>(pluginDescriptors.size());
for (PluginDescriptor pluginDescriptor : pluginDescriptors) {
PluginLoader pluginLoader = PluginLoader.create(pluginDescriptor, parentClassLoader, alwaysParentFirstPatterns);
combinedIterators.add(pluginLoader.load(service));
}
return Iterators.concat(combinedIterators.iterator());
}

@Override
public String toString() {
return "PluginManager{" +
"parentClassLoader=" + parentClassLoader +
", pluginDescriptors=" + pluginDescriptors +
", alwaysParentFirstPatterns=" + Arrays.toString(alwaysParentFirstPatterns) +
'}';
}
<P> Iterator<P> load(Class<P> service);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ private static PluginManager createPluginManagerFromRootFolder(PluginConfig plug
try {
Collection<PluginDescriptor> pluginDescriptors =
new DirectoryBasedPluginFinder(pluginConfig.getPluginsPath().get()).findPlugins();
return new PluginManager(pluginDescriptors, pluginConfig.getAlwaysParentFirstPatterns());
return new DefaultPluginManager(pluginDescriptors, pluginConfig.getAlwaysParentFirstPatterns());
} catch (IOException e) {
throw new FlinkRuntimeException("Exception when trying to initialize plugin system.", e);
}
}
else {
return new PluginManager(Collections.emptyList(), pluginConfig.getAlwaysParentFirstPatterns());
return new DefaultPluginManager(Collections.emptyList(), pluginConfig.getAlwaysParentFirstPatterns());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.test.plugin;

import org.apache.flink.core.plugin.DefaultPluginManager;
import org.apache.flink.core.plugin.DirectoryBasedPluginFinder;
import org.apache.flink.core.plugin.PluginDescriptor;
import org.apache.flink.core.plugin.PluginFinder;
Expand All @@ -43,9 +44,9 @@
import java.util.Set;

/**
* Test for {@link PluginManager}.
* Test for {@link DefaultPluginManager}.
*/
public class PluginManagerTest extends PluginTestBase {
public class DefaultPluginManagerTest extends PluginTestBase {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand Down Expand Up @@ -84,7 +85,7 @@ public void setup() throws Exception {
public void testPluginLoading() {

String[] parentPatterns = { TestSpi.class.getName(), OtherTestSpi.class.getName() };
final PluginManager pluginManager = new PluginManager(descriptors, PARENT_CLASS_LOADER, parentPatterns);
final PluginManager pluginManager = new DefaultPluginManager(descriptors, PARENT_CLASS_LOADER, parentPatterns);
final List<TestSpi> serviceImplList = Lists.newArrayList(pluginManager.load(TestSpi.class));
Assert.assertEquals(2, serviceImplList.size());

Expand Down

0 comments on commit 6b5c173

Please sign in to comment.