Skip to content

Commit

Permalink
[FLINK-14972] Implement RemoteExecutor + make RemoteEnvironment use it
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Dec 3, 2019
1 parent 814a9fa commit 057c036
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment.executors;

import org.apache.flink.annotation.Internal;
import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
import org.apache.flink.client.deployment.StandaloneClientFactory;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.core.execution.Executor;

/**
* The {@link Executor} to be used when executing a job on an already running cluster.
*/
@Internal
public class RemoteExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {

public static final String NAME = "remote-cluster";

public RemoteExecutor() {
super(new StandaloneClientFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment.executors;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;

/**
* An {@link ExecutorFactory} for {@link RemoteExecutor remote executors}.
*/
@Internal
public class RemoteExecutorFactory implements ExecutorFactory {

@Override
public boolean isCompatibleWith(final Configuration configuration) {
return RemoteExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
}

@Override
public Executor getExecutor(final Configuration configuration) {
return new RemoteExecutor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
# limitations under the License.

org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
org.apache.flink.client.deployment.executors.RemoteExecutorFactory
org.apache.flink.client.deployment.executors.LocalExecutorFactory
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.util.JarUtils;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment
Expand All @@ -46,21 +54,6 @@
@Public
public class RemoteEnvironment extends ExecutionEnvironment {

/** The hostname of the JobManager. */
private final String host;

/** The port of the JobManager main actor system. */
private final int port;

/** The jar files that need to be attached to each job. */
private final List<URL> jarFiles;

/** The configuration used by the client that connects to the cluster. */
private Configuration clientConfiguration;

/** The classpaths that need to be attached to each job. */
private final List<URL> globalClasspaths;

/**
* Creates a new RemoteEnvironment that points to the master (JobManager) described by the
* given host name and port.
Expand All @@ -74,7 +67,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
* provided in the JAR files.
*/
public RemoteEnvironment(String host, int port, String... jarFiles) {
this(host, port, null, jarFiles, null);
this(host, port, new Configuration(), jarFiles, null);
}

/**
Expand Down Expand Up @@ -111,58 +104,94 @@ public RemoteEnvironment(String host, int port, Configuration clientConfig, Stri
* protocol (e.g. file:https://) and be accessible on all nodes (e.g. by means of a NFS share).
* The protocol must be supported by the {@link java.net.URLClassLoader}.
*/
public RemoteEnvironment(String host, int port, Configuration clientConfig,
String[] jarFiles, URL[] globalClasspaths) {
public RemoteEnvironment(String host, int port, Configuration clientConfig, String[] jarFiles, URL[] globalClasspaths) {
super(validateAndGetEffectiveConfiguration(clientConfig, host, port, jarFiles, globalClasspaths));
}

private static Configuration validateAndGetEffectiveConfiguration(
final Configuration configuration,
final String host,
final int port,
final String[] jarFiles,
final URL[] globalClasspaths) {
validate(host, port);
return getEffectiveConfiguration(
getClientConfiguration(configuration),
host,
port,
getJarFiles(jarFiles),
getClasspathURLs(globalClasspaths));
}

private static void validate(final String host, final int port) {
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The RemoteEnvironment cannot be instantiated when running in a pre-defined context " +
"(such as Command Line Client, Scala Shell, or TestEnvironment)");
}
if (host == null) {
throw new NullPointerException("Host must not be null.");
}
if (port < 1 || port >= 0xffff) {
throw new IllegalArgumentException("Port out of range");
}

this.host = host;
this.port = port;
this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig;
if (jarFiles != null) {
this.jarFiles = new ArrayList<>(jarFiles.length);
for (String jarFile : jarFiles) {
try {
this.jarFiles.add(new File(jarFile).getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) {
throw new IllegalArgumentException("JAR file path invalid", e);
}
}
}
else {
this.jarFiles = Collections.emptyList();
}
checkNotNull(host);
checkArgument(port > 0 && port < 0xffff);
}

if (globalClasspaths == null) {
this.globalClasspaths = Collections.emptyList();
} else {
this.globalClasspaths = Arrays.asList(globalClasspaths);
}
private static Configuration getClientConfiguration(final Configuration configuration) {
return configuration == null ? new Configuration() : configuration;
}

// ------------------------------------------------------------------------
private static List<URL> getClasspathURLs(final URL[] classpaths) {
return classpaths == null ? Collections.emptyList() : Arrays.asList(classpaths);
}

@Override
public JobExecutionResult execute(String jobName) throws Exception {
final Plan p = createProgramPlan(jobName);
private static List<URL> getJarFiles(final String[] jars) {
return jars == null
? Collections.emptyList()
: Arrays.stream(jars).map(jarPath -> {
try {
final URL fileURL = new File(jarPath).getAbsoluteFile().toURI().toURL();
JarUtils.checkJarFile(fileURL);
return fileURL;
} catch (MalformedURLException e) {
throw new IllegalArgumentException("JAR file path invalid", e);
} catch (IOException e) {
throw new RuntimeException("Problem with jar file " + jarPath, e);
}
}).collect(Collectors.toList());
}

private static Configuration getEffectiveConfiguration(
final Configuration baseConfiguration,
final String host,
final int port,
final List<URL> jars,
final List<URL> classpaths) {

final PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration);
lastJobExecutionResult = executor.executePlan(p, jarFiles, globalClasspaths);
return lastJobExecutionResult;
final Configuration effectiveConfiguration = new Configuration(baseConfiguration);

setJobManagerAddressToConfig(host, port, effectiveConfiguration);
ConfigUtils.encodeCollectionToConfig(effectiveConfiguration, PipelineOptions.CLASSPATHS, classpaths, URL::toString);
ConfigUtils.encodeCollectionToConfig(effectiveConfiguration, PipelineOptions.JARS, jars, URL::toString);

// these should be set in the end to overwrite any values from the client config provided in the constructor.
effectiveConfiguration.setString(DeploymentOptions.TARGET, "remote-cluster");
effectiveConfiguration.setBoolean(DeploymentOptions.ATTACHED, true);

return effectiveConfiguration;
}

private static void setJobManagerAddressToConfig(final String host, final int port, final Configuration effectiveConfiguration) {
final InetSocketAddress address = new InetSocketAddress(host, port);
effectiveConfiguration.setString(JobManagerOptions.ADDRESS, address.getHostString());
effectiveConfiguration.setInteger(JobManagerOptions.PORT, address.getPort());
effectiveConfiguration.setString(RestOptions.ADDRESS, address.getHostString());
effectiveConfiguration.setInteger(RestOptions.PORT, address.getPort());
}

@Override
public String toString() {
return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
(getParallelism() == -1 ? "default" : getParallelism()) + ").";
final String host = getConfiguration().getString(JobManagerOptions.ADDRESS);
final int port = getConfiguration().getInteger(JobManagerOptions.PORT);
final String parallelism = (getParallelism() == -1 ? "default" : "" + getParallelism());

return "Remote Environment (" + host + ":" + port + " - parallelism = " + parallelism + ").";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public Long map(Long value) throws Exception {
Throwable error = errorRef[0];
assertNotNull("The program did not fail properly", error);

assertTrue(error instanceof ProgramInvocationException);
assertTrue(error.getCause() instanceof ProgramInvocationException);
// all seems well :-)
}
catch (Exception e) {
Expand Down

0 comments on commit 057c036

Please sign in to comment.