Skip to content

Commit

Permalink
[FLINK-15292] Rename ExecutorFactory to PipelineExecutorFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Jan 13, 2020
1 parent ff097e0 commit d82b604
Show file tree
Hide file tree
Showing 24 changed files with 73 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down Expand Up @@ -117,7 +117,7 @@ public static JobExecutionResult submitJobAndWaitForResult(
}

public static void executeProgram(
ExecutorServiceLoader executorServiceLoader,
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program) throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;

/**
* An {@link ExecutorFactory} for {@link LocalExecutor local executors}.
* An {@link PipelineExecutorFactory} for {@link LocalExecutor local executors}.
*/
@Internal
public class LocalExecutorFactory implements ExecutorFactory {
public class LocalExecutorFactory implements PipelineExecutorFactory {

@Override
public boolean isCompatibleWith(final Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;

/**
* An {@link ExecutorFactory} for {@link RemoteExecutor remote executors}.
* An {@link PipelineExecutorFactory} for {@link RemoteExecutor remote executors}.
*/
@Internal
public class RemoteExecutorFactory implements ExecutorFactory {
public class RemoteExecutorFactory implements PipelineExecutorFactory {

@Override
public boolean isCompatibleWith(final Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
Expand All @@ -43,7 +43,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);

ContextEnvironment(
final ExecutorServiceLoader executorServiceLoader,
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader) {
super(executorServiceLoader, configuration, userCodeClassLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -32,14 +32,14 @@
*/
public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {

private final ExecutorServiceLoader executorServiceLoader;
private final PipelineExecutorServiceLoader executorServiceLoader;

private final Configuration configuration;

private final ClassLoader userCodeClassLoader;

public ContextEnvironmentFactory(
final ExecutorServiceLoader executorServiceLoader,
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader) {
this.executorServiceLoader = checkNotNull(executorServiceLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
Expand Down Expand Up @@ -342,7 +342,7 @@ public static void main(String[] args) throws Exception {
}
}

private static final class TestExecutorServiceLoader implements ExecutorServiceLoader {
private static final class TestExecutorServiceLoader implements PipelineExecutorServiceLoader {

private final ClusterClient<?> clusterClient;

Expand All @@ -354,8 +354,8 @@ private static final class TestExecutorServiceLoader implements ExecutorServiceL
}

@Override
public ExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
return new ExecutorFactory() {
public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
return new PipelineExecutorFactory() {
@Override
public boolean isCompatibleWith(@Nonnull Configuration configuration) {
return TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,31 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
* Java service discovery to find the available {@link ExecutorFactory executor factories}.
* The default implementation of the {@link PipelineExecutorServiceLoader}. This implementation uses
* Java service discovery to find the available {@link PipelineExecutorFactory executor factories}.
*/
@Internal
public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoader {

// TODO: This code is almost identical to the ClusterClientServiceLoader and its default implementation.
// The reason of this duplication is the package structure which does not allow for the ExecutorServiceLoader
// to know about the ClusterClientServiceLoader. Remove duplication when package structure has improved.

private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceLoader.class);

private static final ServiceLoader<ExecutorFactory> defaultLoader = ServiceLoader.load(ExecutorFactory.class);
private static final ServiceLoader<PipelineExecutorFactory> defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);

public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();

@Override
public ExecutorFactory getExecutorFactory(final Configuration configuration) {
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);

final List<ExecutorFactory> compatibleFactories = new ArrayList<>();
final Iterator<ExecutorFactory> factories = defaultLoader.iterator();
final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
final Iterator<PipelineExecutorFactory> factories = defaultLoader.iterator();
while (factories.hasNext()) {
try {
final ExecutorFactory factory = factories.next();
final PipelineExecutorFactory factory = factories.next();
if (factory != null && factory.isCompatibleWith(configuration)) {
compatibleFactories.add(factory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* based on a provided {@link Configuration}.
*/
@Internal
public interface ExecutorFactory {
public interface PipelineExecutorFactory {

/**
* Returns {@code true} if this factory is compatible with the options in the
Expand All @@ -38,5 +38,5 @@ public interface ExecutorFactory {
* Instantiates an {@link PipelineExecutor} compatible with the provided configuration.
* @return the executor instance.
*/
PipelineExecutor getExecutor(Configuration configuration);
PipelineExecutor getExecutor(final Configuration configuration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@
* execute a given {@link org.apache.flink.api.dag.Pipeline}.
*/
@Internal
public interface ExecutorServiceLoader {
public interface PipelineExecutorServiceLoader {

/**
* Loads the {@link ExecutorFactory} which is compatible with the provided configuration.
* Loads the {@link PipelineExecutorFactory} which is compatible with the provided configuration.
* There can be at most one compatible factory among the available ones, otherwise an exception
* will be thrown.
*
* @return a compatible {@link ExecutorFactory}.
* @return a compatible {@link PipelineExecutorFactory}.
* @throws Exception if there is more than one compatible factories, or something went wrong when
* loading the registered factories.
*/
ExecutorFactory getExecutorFactory(final Configuration configuration) throws Exception;
PipelineExecutorFactory getExecutorFactory(final Configuration configuration) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -133,7 +133,7 @@ public class ExecutionEnvironment {
/** Flag to indicate whether sinks have been cleared in previous executions. */
private boolean wasExecuted = false;

private final ExecutorServiceLoader executorServiceLoader;
private final PipelineExecutorServiceLoader executorServiceLoader;

private final Configuration configuration;

Expand All @@ -154,12 +154,12 @@ public ExecutionEnvironment(final Configuration configuration) {
* Creates a new {@link ExecutionEnvironment} that will use the given {@link
* Configuration} to configure the {@link PipelineExecutor}.
*
* <p>In addition, this constructor allows specifying the {@link ExecutorServiceLoader} and
* <p>In addition, this constructor allows specifying the {@link PipelineExecutorServiceLoader} and
* user code {@link ClassLoader}.
*/
@PublicEvolving
public ExecutionEnvironment(
final ExecutorServiceLoader executorServiceLoader,
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userClassloader) {
this.executorServiceLoader = checkNotNull(executorServiceLoader);
Expand All @@ -180,7 +180,7 @@ public ClassLoader getUserCodeClassLoader() {
}

@Internal
public ExecutorServiceLoader getExecutorServiceLoader() {
public PipelineExecutorServiceLoader getExecutorServiceLoader() {
return executorServiceLoader;
}

Expand Down Expand Up @@ -898,7 +898,7 @@ public JobClient executeAsync(String jobName) throws Exception {
consolidateParallelismDefinitionsInConfiguration();

final Plan plan = createProgramPlan(jobName);
final ExecutorFactory executorFactory =
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);

checkNotNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;

import org.junit.Test;

Expand All @@ -35,7 +35,7 @@
import static org.hamcrest.core.Is.is;

/**
* Tests the {@link ExecutorFactory} discovery in the {@link ExecutionEnvironment} and the calls of the {@link JobClient}.
* Tests the {@link PipelineExecutorFactory} discovery in the {@link ExecutionEnvironment} and the calls of the {@link JobClient}.
*/
public class ExecutorDiscoveryAndJobClientTest {

Expand Down Expand Up @@ -69,10 +69,10 @@ private JobExecutionResult executeTestJobBasedOnConfig(final Configuration confi
}

/**
* An {@link ExecutorFactory} that returns an {@link PipelineExecutor} that instead of executing, it simply
* An {@link PipelineExecutorFactory} that returns an {@link PipelineExecutor} that instead of executing, it simply
* returns its name in the {@link JobExecutionResult}.
*/
public static class IDReportingExecutorFactory implements ExecutorFactory {
public static class IDReportingExecutorFactory implements PipelineExecutorFactory {

@Override
public boolean isCompatibleWith(Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;

import javax.annotation.Nonnull;

/**
* An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
* An {@link PipelineExecutorFactory} for executing jobs on an existing (session) cluster.
*/
@Internal
public class KubernetesSessionClusterExecutorFactory implements ExecutorFactory {
public class KubernetesSessionClusterExecutorFactory implements PipelineExecutorFactory {

@Override
public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;

Expand Down Expand Up @@ -147,7 +147,7 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig

@PublicEvolving
public RemoteStreamEnvironment(
final ExecutorServiceLoader executorServiceLoader,
final PipelineExecutorServiceLoader executorServiceLoader,
final String host,
final int port,
final Configuration clientConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
Expand Down Expand Up @@ -164,7 +164,7 @@ public class StreamExecutionEnvironment {

protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<>();

private final ExecutorServiceLoader executorServiceLoader;
private final PipelineExecutorServiceLoader executorServiceLoader;

private final Configuration configuration;

Expand Down Expand Up @@ -196,12 +196,12 @@ public StreamExecutionEnvironment(final Configuration configuration) {
* Creates a new {@link StreamExecutionEnvironment} that will use the given {@link
* Configuration} to configure the {@link PipelineExecutor}.
*
* <p>In addition, this constructor allows specifying the {@link ExecutorServiceLoader} and
* <p>In addition, this constructor allows specifying the {@link PipelineExecutorServiceLoader} and
* user code {@link ClassLoader}.
*/
@PublicEvolving
public StreamExecutionEnvironment(
final ExecutorServiceLoader executorServiceLoader,
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userClassloader) {
this.executorServiceLoader = checkNotNull(executorServiceLoader);
Expand Down Expand Up @@ -1731,7 +1731,7 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception {

consolidateParallelismDefinitionsInConfiguration();

final ExecutorFactory executorFactory =
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);

checkNotNull(
Expand Down
Loading

0 comments on commit d82b604

Please sign in to comment.