Skip to content

Commit

Permalink
[FLINK-4930] [client, yarn] Implement FLIP-6 YARN client
Browse files Browse the repository at this point in the history
Summary: Implement FLIP-6 YARN client

Test Plan: NA

Reviewers: biao.liub

Differential Revision: http:https://phabricator.taobao.net/D6563
  • Loading branch information
tiemsn authored and StephanEwen committed Dec 23, 2016
1 parent 2a7dbda commit 3695a8e
Show file tree
Hide file tree
Showing 5 changed files with 524 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class CliFrontend {
/** command line interface of the YARN session, with a special initialization here
* to prefix all options with y/yarn. */
loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnCLI", "y", "yarn");
customCommandLine.add(new DefaultCLI());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -60,6 +61,8 @@
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -460,28 +463,6 @@ protected YarnClusterClient deployInternal() throws Exception {
flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
}

// ------------------ Set default file system scheme -------------------------

try {
org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
} catch (IOException e) {
throw new IOException("Error while setting the default " +
"filesystem scheme from configuration.", e);
}

// initialize file system
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(conf);

// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
fs.getScheme().startsWith("file")) {
LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}

// ------------------ Check if the YARN ClusterClient has the requested resources --------------

// the yarnMinAllocationMB specifies the smallest possible container allocation size.
Expand All @@ -505,6 +486,7 @@ protected YarnClusterClient deployInternal() throws Exception {
// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();

Resource maxRes = appResponse.getMaximumResourceCapability();
final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
Expand Down Expand Up @@ -560,6 +542,45 @@ protected YarnClusterClient deployInternal() throws Exception {
}
}

ApplicationReport report = startAppMaster(null, yarnClient);

String host = report.getHost();
int port = report.getRpcPort();

// Correctly initialize the Flink config
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);

// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
}

public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient) throws Exception {

// ------------------ Set default file system scheme -------------------------

try {
org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
} catch (IOException e) {
throw new IOException("Error while setting the default " +
"filesystem scheme from configuration.", e);
}

// initialize file system
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(conf);

// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
fs.getScheme().startsWith("file")) {
LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}

final YarnClientApplication yarnApplication = yarnClient.createApplication();
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
Set<File> effectiveShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
effectiveShipFiles.add(file.getAbsoluteFile());
Expand Down Expand Up @@ -596,8 +617,8 @@ protected YarnClusterClient deployInternal() throws Exception {
effectiveShipFiles.addAll(userJarFiles);
}


// Set-up ApplicationSubmissionContext for the application
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();

final ApplicationId appId = appContext.getApplicationId();

Expand Down Expand Up @@ -694,6 +715,27 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
paths.add(remotePathConf);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);

// write job graph to tmp file and add it to local resource
// TODO: need refine ?
if (jobGraph != null) {
try {
File fp = new File("/tmp/jobgraph-" + appId.toString());
FileOutputStream input = new FileOutputStream(fp);
ObjectOutputStream obInput = new ObjectOutputStream(input);
obInput.writeObject(jobGraph);
input.close();
LocalResource jobgraph = Records.newRecord(LocalResource.class);
Path remoteJobGraph =
Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());
localResources.put("job.graph", jobgraph);
paths.add(remoteJobGraph);
classPathBuilder.append("job.graph").append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail");
throw e;
}
}

sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");

FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
Expand Down Expand Up @@ -835,7 +877,7 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
LOG.debug("Application State: {}", appState);
switch(appState) {
case FAILED:
case FINISHED:
case FINISHED: //TODO: the finished state may be valid in flip-6
case KILLED:
throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
+ appState + " during deployment. \n" +
Expand Down Expand Up @@ -871,16 +913,7 @@ public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttrib
} catch (IllegalStateException e) {
// we're already in the shut down hook.
}

String host = report.getHost();
int port = report.getRpcPort();

// Correctly initialize the Flink config
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);

// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
return report;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;

import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.List;

/**
* Java representation of a running Flink job on YARN.
* Since flip-6, a flink job will be run as a yarn job by default, each job has a jobmaster,
* so this class will be used as a client to communicate with yarn and start the job on yarn.
*/
public class YarnClusterClientV2 extends ClusterClient {

private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClientV2.class);

private YarnClient yarnClient;

private final AbstractYarnClusterDescriptor clusterDescriptor;

private ApplicationId appId;

private String trackingURL;

/**
* Create a client to communicate with YARN cluster.
*
* @param clusterDescriptor The descriptor used to create yarn job
* @param flinkConfig Flink configuration
* @throws java.io.IOException
*/
public YarnClusterClientV2(
final AbstractYarnClusterDescriptor clusterDescriptor,
org.apache.flink.configuration.Configuration flinkConfig) throws IOException {

super(flinkConfig);

this.clusterDescriptor = clusterDescriptor;
this.yarnClient = clusterDescriptor.getYarnClient();
this.trackingURL = "";
}

@Override
public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
return flinkConfig;
}

@Override
public int getMaxSlots() {
// Now need not set max slot
return 0;
}

@Override
public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
return clusterDescriptor.hasUserJarFiles(userJarFiles);
}

@Override
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
try {
// Create application via yarnClient
ApplicationReport report = this.clusterDescriptor.startAppMaster(jobGraph, yarnClient);
if (report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
appId = report.getApplicationId();
trackingURL = report.getTrackingUrl();
logAndSysout("Please refer to " + getWebInterfaceURL()
+ " for the running status of job " + jobGraph.getJobID().toString());
//TODO: not support attach mode now
return new JobSubmissionResult(jobGraph.getJobID());
}
else {
throw new ProgramInvocationException("Fail to submit the job.");
}
}
catch (Exception e) {
throw new ProgramInvocationException("Fail to submit the job", e.getCause());
}
}

@Override
public String getWebInterfaceURL() {
// there seems to be a difference between HD 2.2.0 and 2.6.0
if(!trackingURL.startsWith("http:https://")) {
return "http:https://" + trackingURL;
} else {
return trackingURL;
}
}

@Override
public String getClusterIdentifier() {
return "Yarn cluster with application id " + getApplicationId();
}

/**
* This method is only available if the cluster hasn't been started in detached mode.
*/
@Override
public GetClusterStatusResponse getClusterStatus() {
throw new UnsupportedOperationException("Not support getClusterStatus since Flip-6.");
}

public ApplicationStatus getApplicationStatus() {
//TODO: this method is useful for later
return null;
}

@Override
public List<String> getNewMessages() {
throw new UnsupportedOperationException("Not support getNewMessages since Flip-6.");
}

@Override
public void finalizeCluster() {
throw new UnsupportedOperationException("Not support finalizeCluster since Flip-6.");
}

@Override
public boolean isDetached() {
return super.isDetached() || clusterDescriptor.isDetachedMode();
}

@Override
public void waitForClusterToBeReady() {
throw new UnsupportedOperationException("Not support waitForClusterToBeReady since Flip-6.");
}

@Override
public InetSocketAddress getJobManagerAddress() {
//TODO: just return a local address in order to be compatible with createClient in CliFrontend
return new InetSocketAddress("localhost", 0);
}

public ApplicationId getApplicationId() {
return appId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;


/**
* Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the new application master for a job under flip-6.
* This implementation is now however tricky, since YarnClusterDescriptorV2 is related YarnClusterClientV2, but AbstractYarnClusterDescriptor is related
* to YarnClusterClient. We should let YarnClusterDescriptorV2 implements ClusterDescriptor<YarnClusterClientV2>.
* However, in order to use the code in AbstractYarnClusterDescriptor for setting environments and so on, we make YarnClusterDescriptorV2 as now.
*/
public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {

@Override
protected Class<?> getApplicationMasterClass() {
return YarnFlinkApplicationMasterRunner.class;
}

}
Loading

0 comments on commit 3695a8e

Please sign in to comment.