Skip to content

Commit

Permalink
[FLINK-17632][yarn] Always build the packaged program in the cluster …
Browse files Browse the repository at this point in the history
…for application mode
  • Loading branch information
wangyang0918 authored and kl0u committed May 15, 2020
1 parent cd1f858 commit 04bcc25
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,29 +188,11 @@ protected void runApplication(String[] args) throws Exception {

programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
// Build the packaged program when the user jar is a local file
if (uri.getScheme().equals("file")) {
final PackagedProgram program =
getPackagedProgram(programOptions);

final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(program.getArguments(), program.getMainClassName());

try {
final List<URL> jobJars = program.getJobJarAndDependencies();
final Configuration effectiveConfiguration =
getEffectiveConfiguration(commandLine, programOptions, jobJars);
deployer.run(effectiveConfiguration, applicationConfiguration);
} finally {
program.deleteExtractedLibraries();
}
} else {
final Configuration effectiveConfiguration =
getEffectiveConfiguration(commandLine, programOptions, Collections.singletonList(uri.toString()));
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
deployer.run(effectiveConfiguration, applicationConfiguration);
}
final Configuration effectiveConfiguration =
getEffectiveConfiguration(commandLine, programOptions, Collections.singletonList(uri.toString()));
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
deployer.run(effectiveConfiguration, applicationConfiguration);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.FileNotFoundException;
import java.time.Duration;
import java.util.Collections;

import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;

/**
* Test cases for the deployment of Yarn Flink application clusters.
*/
public class YARNApplicationITCase extends YarnTestBase {

private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(30);
private static final int sleepIntervalInMS = 100;

@BeforeClass
public static void setup() {
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-application");
startYARNWithConfig(YARN_CONFIGURATION, true);
}

@Test
public void testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion() throws Exception {
runTest(
() -> deployApplication(
createDefaultConfiguration(getTestingJar(), YarnConfigOptions.UserJarInclusion.FIRST)));
}

@Test
public void testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion() throws Exception {
runTest(
() -> deployApplication(
createDefaultConfiguration(getTestingJar(), YarnConfigOptions.UserJarInclusion.DISABLED)));
}

private void deployApplication(Configuration configuration) throws Exception {
try (final YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptor(configuration)) {

final int masterMemory = yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes();
final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(masterMemory)
.setTaskManagerMemoryMB(1024)
.setSlotsPerTaskManager(1)
.createClusterSpecification();

try (ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor
.deployApplicationCluster(
clusterSpecification,
ApplicationConfiguration.fromConfiguration(configuration))
.getClusterClient()) {

ApplicationId applicationId = clusterClient.getClusterId();

waitApplicationFinishedElseKillIt(
applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor, sleepIntervalInMS);
}
}
}

private Path getTestingJar() throws FileNotFoundException {
return new Path(getTestJarPath("StreamingWordCount.jar").toURI());
}

private Configuration createDefaultConfiguration(Path userJar, YarnConfigOptions.UserJarInclusion userJarInclusion) {
Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
configuration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
configuration.setString(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion.toString());
configuration.set(PipelineOptions.JARS, Collections.singletonList(userJar.toString()));

return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ public ClusterClientProvider<ApplicationId> deployApplicationCluster(

applicationConfiguration.applyToConfiguration(flinkConfiguration);

final List<String> pipelineJars = flinkConfiguration.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList());
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");

final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* An {@link ApplicationClusterEntryPoint} for Yarn.
Expand Down Expand Up @@ -118,9 +122,16 @@ private static PackagedProgramRetriever getPackagedProgramRetriever(
final Configuration configuration,
final String[] programArguments,
@Nullable final String jobClassName) throws IOException {

final List<File> pipelineJars = configuration.get(PipelineOptions.JARS).stream()
.map(uri -> new File(YarnEntrypointUtils.getUsrLibDir(configuration).orElse(null), new Path(uri).getName()))
.collect(Collectors.toList());
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");

final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
ClassPathPackagedProgramRetriever
.newBuilder(programArguments)
.setJarFile(pipelineJars.get(0))
.setJobClassName(jobClassName);
YarnEntrypointUtils.getUsrLibDir(configuration).ifPresent(retrieverBuilder::setUserLibDirectory);
return retrieverBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@

import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;

import org.apache.hadoop.fs.Path;
Expand All @@ -49,6 +53,7 @@
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -57,6 +62,7 @@
import java.util.Set;

import static junit.framework.TestCase.assertTrue;
import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -74,6 +80,11 @@ public class YarnClusterDescriptorTest extends TestLogger {

private static YarnClient yarnClient;

private final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setSlotsPerTaskManager(Integer.MAX_VALUE)
.createClusterSpecification();
private final ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[0], null);

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

Expand Down Expand Up @@ -106,10 +117,6 @@ public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentExc

clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));

ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setSlotsPerTaskManager(Integer.MAX_VALUE)
.createClusterSpecification();

try {
clusterDescriptor.deploySessionCluster(clusterSpecification);

Expand Down Expand Up @@ -517,6 +524,32 @@ public void testYarnClientShutDown() {
assertTrue(closableYarnClient.isInState(Service.STATE.STOPPED));
}

@Test
public void testDeployApplicationClusterWithDeploymentTargetNotCorrectlySet() {
final Configuration flinkConfig = new Configuration();
flinkConfig.set(PipelineOptions.JARS, Collections.singletonList("file:https:///path/of/user.jar"));
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName());
try (final YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptor(flinkConfig)) {
assertThrows(
"Expected deployment.target=yarn-application",
ClusterDeploymentException.class,
() -> yarnClusterDescriptor.deployApplicationCluster(clusterSpecification, appConfig));
}
}

@Test
public void testDeployApplicationClusterWithMultipleJarsSet() {
final Configuration flinkConfig = new Configuration();
flinkConfig.set(PipelineOptions.JARS, Arrays.asList("local:https:///path/of/user.jar", "local:https:///user2.jar"));
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
try (final YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptor(flinkConfig)) {
assertThrows(
"Should only have one jar",
IllegalArgumentException.class,
() -> yarnClusterDescriptor.deployApplicationCluster(clusterSpecification, appConfig));
}
}

private YarnClusterDescriptor createYarnClusterDescriptor() {
return createYarnClusterDescriptor(new Configuration());
}
Expand Down

0 comments on commit 04bcc25

Please sign in to comment.