Skip to content

Commit

Permalink
[FLINK-15292] Rename Executor to PipelineExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Jan 13, 2020
1 parent 97535b3 commit ff097e0
Show file tree
Hide file tree
Showing 24 changed files with 57 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.execution.PipelineExecutor;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
Expand All @@ -36,7 +37,7 @@
/**
* A generic implementation of the {@link CustomCommandLine} that only expects
* the execution.target parameter to be explicitly specified and simply forwards the
* rest of the options specified with -D to the corresponding {@link org.apache.flink.core.execution.Executor}
* rest of the options specified with -D to the corresponding {@link PipelineExecutor}
* for further parsing.
*/
@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;

import org.slf4j.Logger;
Expand All @@ -37,13 +37,13 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* An abstract {@link Executor} used to execute {@link Pipeline pipelines} on dedicated (per-job) clusters.
* An abstract {@link PipelineExecutor} used to execute {@link Pipeline pipelines} on dedicated (per-job) clusters.
*
* @param <ClusterID> the type of the id of the cluster.
* @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
*/
@Internal
public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {

private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;

import javax.annotation.Nonnull;
Expand All @@ -35,13 +35,13 @@
import static org.apache.flink.util.Preconditions.checkState;

/**
* An abstract {@link Executor} used to execute {@link Pipeline pipelines} on an existing (session) cluster.
* An abstract {@link PipelineExecutor} used to execute {@link Pipeline pipelines} on an existing (session) cluster.
*
* @param <ClusterID> the type of the id of the cluster.
* @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
*/
@Internal
public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {

private final ClientFactory clusterClientFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
Expand All @@ -44,10 +44,10 @@
import static org.apache.flink.util.Preconditions.checkState;

/**
* An {@link Executor} for executing a {@link Pipeline} locally.
* An {@link PipelineExecutor} for executing a {@link Pipeline} locally.
*/
@Internal
public class LocalExecutor implements Executor {
public class LocalExecutor implements PipelineExecutor {

public static final String NAME = "local";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutor;

/**
* An {@link ExecutorFactory} for {@link LocalExecutor local executors}.
Expand All @@ -36,7 +36,7 @@ public boolean isCompatibleWith(final Configuration configuration) {
}

@Override
public Executor getExecutor(final Configuration configuration) {
public PipelineExecutor getExecutor(final Configuration configuration) {
return new LocalExecutor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
import org.apache.flink.client.deployment.StandaloneClientFactory;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.PipelineExecutor;

/**
* The {@link Executor} to be used when executing a job on an already running cluster.
* The {@link PipelineExecutor} to be used when executing a job on an already running cluster.
*/
@Internal
public class RemoteExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutor;

/**
* An {@link ExecutorFactory} for {@link RemoteExecutor remote executors}.
Expand All @@ -36,7 +36,7 @@ public boolean isCompatibleWith(final Configuration configuration) {
}

@Override
public Executor getExecutor(final Configuration configuration) {
public PipelineExecutor getExecutor(final Configuration configuration) {
return new RemoteExecutor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
Expand Down Expand Up @@ -362,7 +362,7 @@ public boolean isCompatibleWith(@Nonnull Configuration configuration) {
}

@Override
public Executor getExecutor(@Nonnull Configuration configuration) {
public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
return (pipeline, config) -> {
final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan, config, parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.configuration.Configuration;

/**
* A factory for selecting and instantiating the adequate {@link Executor}
* A factory for selecting and instantiating the adequate {@link PipelineExecutor}
* based on a provided {@link Configuration}.
*/
@Internal
Expand All @@ -35,8 +35,8 @@ public interface ExecutorFactory {
boolean isCompatibleWith(final Configuration configuration);

/**
* Instantiates an {@link Executor} compatible with the provided configuration.
* Instantiates an {@link PipelineExecutor} compatible with the provided configuration.
* @return the executor instance.
*/
Executor getExecutor(Configuration configuration);
PipelineExecutor getExecutor(Configuration configuration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.configuration.Configuration;

/**
* An interface to be implemented by the entity responsible for finding the correct {@link Executor} to
* An interface to be implemented by the entity responsible for finding the correct {@link PipelineExecutor} to
* execute a given {@link org.apache.flink.api.dag.Pipeline}.
*/
@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The entity responsible for executing a {@link Pipeline}, i.e. a user job.
*/
@Internal
public interface Executor {
public interface PipelineExecutor {

/**
* Executes a {@link Pipeline} based on the provided configuration and returns a {@link JobClient} which allows to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -142,7 +143,7 @@ public class ExecutionEnvironment {

/**
* Creates a new {@link ExecutionEnvironment} that will use the given {@link Configuration} to
* configure the {@link org.apache.flink.core.execution.Executor}.
* configure the {@link PipelineExecutor}.
*/
@PublicEvolving
public ExecutionEnvironment(final Configuration configuration) {
Expand All @@ -151,7 +152,7 @@ public ExecutionEnvironment(final Configuration configuration) {

/**
* Creates a new {@link ExecutionEnvironment} that will use the given {@link
* Configuration} to configure the {@link org.apache.flink.core.execution.Executor}.
* Configuration} to configure the {@link PipelineExecutor}.
*
* <p>In addition, this constructor allows specifying the {@link ExecutorServiceLoader} and
* user code {@link ClassLoader}.
Expand Down Expand Up @@ -993,7 +994,7 @@ protected void registerCachedFilesWithPlan(Plan p) throws IOException {
/**
* Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks,
* and operations and how they interact, as an isolated unit that can be executed with an
* {@link org.apache.flink.core.execution.Executor}. Obtaining a plan and starting it with an
* {@link PipelineExecutor}. Obtaining a plan and starting it with an
* executor is an alternative way to run a program and is only possible if the program consists
* only of distributed operations.
* This automatically starts a new stage of execution.
Expand All @@ -1008,7 +1009,7 @@ public Plan createProgramPlan() {
/**
* Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks,
* and operations and how they interact, as an isolated unit that can be executed with an
* {@link org.apache.flink.core.execution.Executor}. Obtaining a plan and starting it with an
* {@link PipelineExecutor}. Obtaining a plan and starting it with an
* executor is an alternative way to run a program and is only possible if the program consists
* only of distributed operations.
* This automatically starts a new stage of execution.
Expand All @@ -1024,7 +1025,7 @@ public Plan createProgramPlan(String jobName) {
/**
* Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks,
* and operations and how they interact, as an isolated unit that can be executed with an
* {@link org.apache.flink.core.execution.Executor}. Obtaining a plan and starting it with an
* {@link PipelineExecutor}. Obtaining a plan and starting it with an
* executor is an alternative way to run a program and is only possible if the program consists
* only of distributed operations.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;

import org.junit.Test;

Expand Down Expand Up @@ -69,7 +69,7 @@ private JobExecutionResult executeTestJobBasedOnConfig(final Configuration confi
}

/**
* An {@link ExecutorFactory} that returns an {@link Executor} that instead of executing, it simply
* An {@link ExecutorFactory} that returns an {@link PipelineExecutor} that instead of executing, it simply
* returns its name in the {@link JobExecutionResult}.
*/
public static class IDReportingExecutorFactory implements ExecutorFactory {
Expand All @@ -80,7 +80,7 @@ public boolean isCompatibleWith(Configuration configuration) {
}

@Override
public Executor getExecutor(Configuration configuration) {
public PipelineExecutor getExecutor(Configuration configuration) {
return (pipeline, executionConfig) -> CompletableFuture.completedFuture(new TestingJobClient());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;

/**
* The {@link Executor} to be used when executing a job on an already running cluster.
* The {@link PipelineExecutor} to be used when executing a job on an already running cluster.
*/
@Internal
public class KubernetesSessionClusterExecutor extends AbstractSessionClusterExecutor<String, KubernetesClusterClientFactory> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutor;

import javax.annotation.Nonnull;

Expand All @@ -39,7 +39,7 @@ public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
}

@Override
public Executor getExecutor(@Nonnull final Configuration configuration) {
public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
return new KubernetesSessionClusterExecutor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.flink.api.scala

import java.util.concurrent.CompletableFuture

import com.esotericsoftware.kryo.Serializer
import org.apache.flink.annotation.{Public, PublicEvolving}
import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
Expand All @@ -32,7 +30,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo}
import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv}
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.core.execution.{JobClient, JobListener, PipelineExecutor}
import org.apache.flink.core.fs.Path
import org.apache.flink.types.StringValue
import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator}
Expand Down Expand Up @@ -562,7 +560,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* Creates the program's [[org.apache.flink.api.common.Plan]].
* The plan is a description of all data sources, data sinks,
* and operations and how they interact, as an isolated unit that can be executed with an
* [[org.apache.flink.core.execution.Executor]]. Obtaining a plan and starting it with an
* [[PipelineExecutor]]. Obtaining a plan and starting it with an
* executor is an alternative way to run a program and is only possible if the program only
* consists of distributed operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
Expand Down Expand Up @@ -184,7 +185,7 @@ public StreamExecutionEnvironment() {

/**
* Creates a new {@link StreamExecutionEnvironment} that will use the given {@link
* Configuration} to configure the {@link org.apache.flink.core.execution.Executor}.
* Configuration} to configure the {@link PipelineExecutor}.
*/
@PublicEvolving
public StreamExecutionEnvironment(final Configuration configuration) {
Expand All @@ -193,7 +194,7 @@ public StreamExecutionEnvironment(final Configuration configuration) {

/**
* Creates a new {@link StreamExecutionEnvironment} that will use the given {@link
* Configuration} to configure the {@link org.apache.flink.core.execution.Executor}.
* Configuration} to configure the {@link PipelineExecutor}.
*
* <p>In addition, this constructor allows specifying the {@link ExecutorServiceLoader} and
* user code {@link ClassLoader}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down Expand Up @@ -139,7 +139,7 @@ public boolean isCompatibleWith(@Nonnull Configuration configuration) {
}

@Override
public Executor getExecutor(@Nonnull Configuration configuration) {
public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
return (pipeline, config) -> {
assertTrue(pipeline instanceof StreamGraph);

Expand Down
Loading

0 comments on commit ff097e0

Please sign in to comment.