Skip to content

Commit

Permalink
[FLINK-14947] Introduce LocalExecutor and make LocalEnvironment use it
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Dec 3, 2019
1 parent 4239dd1 commit 41176ba
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment.executors;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* An {@link Executor} for executing a {@link Pipeline} locally.
*/
@Internal
public class LocalExecutor implements Executor {

@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
checkNotNull(pipeline);
checkNotNull(configuration);

// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));

final JobGraph jobGraph = getJobGraph(pipeline, configuration);
final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);
final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);

return clusterClient
.submitJob(jobGraph)
.thenApply(jobID -> new ClusterClientJobClientAdapter<MiniClusterClient.MiniClusterId>(clusterClient, jobID) {
@Override
protected void doClose() {
clusterClient.close();
shutdownMiniCluster(miniCluster);
}
});
}

private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) {
// This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
// to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
// for now.
if (pipeline instanceof Plan) {
Plan plan = (Plan) pipeline;
final int slotsPerTaskManager = configuration.getInteger(
TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
final int numTaskManagers = configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
}

return FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, 1);
}

private MiniCluster startMiniCluster(final JobGraph jobGraph, final Configuration configuration) throws Exception {
if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
}

int numTaskManagers = configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);

// we have to use the maximum parallelism as a default here, otherwise streaming
// pipelines would not run
int numSlotsPerTaskManager = configuration.getInteger(
TaskManagerOptions.NUM_TASK_SLOTS,
jobGraph.getMaximumParallelism());

final MiniClusterConfiguration miniClusterConfiguration =
new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(numTaskManagers)
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();

final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
miniCluster.start();

configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());

return miniCluster;
}

private void shutdownMiniCluster(final MiniCluster miniCluster) {
try {
if (miniCluster != null) {
miniCluster.close();
}
} catch (Exception e) {
throw new CompletionException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment.executors;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;

/**
* An {@link ExecutorFactory} for {@link LocalExecutor local executors}.
*/
@Internal
public class LocalExecutorFactory implements ExecutorFactory {

public static final String NAME = "local-executor";

@Override
public boolean isCompatibleWith(final Configuration configuration) {
return NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
}

@Override
public Executor getExecutor(final Configuration configuration) {
return new LocalExecutor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ public String getWebInterfaceURL() {
}
}

enum MiniClusterId {
/**
* The type of the Cluster ID for the local {@link MiniCluster}.
*/
public enum MiniClusterId {
INSTANCE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
org.apache.flink.client.deployment.executors.LocalExecutorFactory
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;

import java.util.Collections;
import org.apache.flink.configuration.DeploymentOptions;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -44,9 +40,6 @@
@Public
public class LocalEnvironment extends ExecutionEnvironment {

/** The user-defined configuration for the local execution. */
private final Configuration configuration;

/**
* Creates a new local environment.
*/
Expand All @@ -60,26 +53,20 @@ public LocalEnvironment() {
* @param config The configuration used to configure the local executor.
*/
public LocalEnvironment(Configuration config) {
super(validateAndGetConfiguration(config));
}

private static Configuration validateAndGetConfiguration(final Configuration configuration) {
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The LocalEnvironment cannot be instantiated when running in a pre-defined context " +
"(such as Command Line Client, Scala Shell, or TestEnvironment)");
}
this.configuration = checkNotNull(config);
}

// --------------------------------------------------------------------------------------------

@Override
public JobExecutionResult execute(String jobName) throws Exception {
final Plan p = createProgramPlan(jobName);

final PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
lastJobExecutionResult = executor.executePlan(
p,
Collections.emptyList(),
Collections.emptyList());
return lastJobExecutionResult;
final Configuration effectiveConfiguration = new Configuration(checkNotNull(configuration));
effectiveConfiguration.set(DeploymentOptions.TARGET, "local-executor");
effectiveConfiguration.set(DeploymentOptions.ATTACHED, true);
return effectiveConfiguration;
}

@Override
Expand Down

0 comments on commit 41176ba

Please sign in to comment.