Skip to content

Commit

Permalink
[FLINK-8872][flip6] fix yarn detached mode command parsing
Browse files Browse the repository at this point in the history
The detached flag if given by "-yd" was not passed correctly into the
CliFrontend and resulted in the CLI waiting for submitted jobs to finish instead
of detaching from the execution.

[FLINK-8872][yarn] add tests for YARN detached mode command line parsing with CliFrontend

- create a test-jar of flink-clients
- create CliFrontendRunWithYarnTest based on CliFrontendRunTest that verifies
  CliFrontend's parsing in conjunction with FlinkYarnSessionCli
-> verify detached mode in this test (can be extended further in the future)

This closes apache#5672.
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Apr 13, 2018
1 parent ca5573b commit fdd1c6e
Show file tree
Hide file tree
Showing 13 changed files with 392 additions and 16 deletions.
14 changes: 13 additions & 1 deletion flink-clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ under the License.
-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version><!--$NO-MVN-MAN-VER$-->
Expand All @@ -120,7 +132,7 @@ under the License.
</execution>
</executions>
</plugin>
<!--Remove the external jar test code from the test-classes directory since it musn't be in the
<!--Remove the external jar test code from the test-classes directory since it mustn't be in the
classpath when running the tests to actually test whether the user code class loader
is properly used.-->
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public class CliFrontendParser {
public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
"the job in detached mode");

/**
* @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments
*/
@Deprecated
public static final Option YARN_DETACHED_OPTION = new Option("yd", "yarndetached", false, "If present, runs " +
"the job in detached mode (deprecated; use non-YARN specific option instead)");

static final Option ARGS_OPTION = new Option("a", "arguments", true,
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");

Expand Down Expand Up @@ -117,6 +124,7 @@ public class CliFrontendParser {

LOGGING_OPTION.setRequired(false);
DETACHED_OPTION.setRequired(false);
YARN_DETACHED_OPTION.setRequired(false);

ARGS_OPTION.setRequired(false);
ARGS_OPTION.setArgName("programArgs");
Expand Down Expand Up @@ -158,6 +166,7 @@ private static Options getProgramSpecificOptions(Options options) {
options.addOption(ARGS_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(YARN_DETACHED_OPTION);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public DefaultCLI(Configuration configuration) {

@Override
public boolean isActive(CommandLine commandLine) {
// always active because we can try to read a JobManager address from the config
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;

/**
* Base class for command line options that refer to a JAR file program.
Expand Down Expand Up @@ -112,7 +113,8 @@ else if (args.length > 0) {
}

stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
detachedMode = line.hasOption(DETACHED_OPTION.getOpt());
detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
YARN_DETACHED_OPTION.getOpt());

if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public JobSubmissionResult run(PackagedProgram prog, int parallelism)
return run(jobWithJars, parallelism, prog.getSavepointSettings());
}
else if (prog.isUsingInteractiveMode()) {
log.info("Starting program in interactive mode");
log.info("Starting program in interactive mode (detached: {})", isDetached());

final List<URL> libraries;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void shutdown() {

@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
log.info("Submitting job {}.", jobGraph.getJobID());
log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached());

final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testParallelismWithOverflow() throws Exception {

// --------------------------------------------------------------------------------------------

private static void verifyCliFrontend(
public static void verifyCliFrontend(
AbstractCustomCommandLine<?> cli,
String[] parameters,
int expectedParallelism,
Expand Down
8 changes: 8 additions & 0 deletions flink-yarn-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Needed for the streaming wordcount example -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.flink.client.cli.CliFrontendTestBase;
import org.apache.flink.client.cli.CliFrontendTestUtils;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.util.FakeClusterClient;
import org.apache.flink.yarn.util.NonDeployingYarnClusterDescriptor;

import org.apache.commons.cli.CommandLine;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static junit.framework.TestCase.assertTrue;
import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;

/**
* Tests for the RUN command using a {@link org.apache.flink.yarn.cli.FlinkYarnSessionCli} inside
* the {@link org.apache.flink.client.cli.CliFrontend}.
*
* @see org.apache.flink.client.cli.CliFrontendRunTest
*/
@RunWith(Parameterized.class)
public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {

@Rule
public TemporaryFolder tmp = new TemporaryFolder();

@BeforeClass
public static void init() {
CliFrontendTestUtils.pipeSystemOutToNull();
}

@AfterClass
public static void shutdown() {
CliFrontendTestUtils.restoreSystemOut();
}

@Test
public void testRun() throws Exception {
String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();

Configuration configuration = new Configuration();
configuration.setString(CoreOptions.MODE, mode);
configuration.setString(JobManagerOptions.ADDRESS, "localhost");
configuration.setInteger(JobManagerOptions.PORT, 8081);

FlinkYarnSessionCli yarnCLI = new TestingFlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");

// test detached mode
{
String[] parameters = {"-m", "yarn-cluster", "-yn", "1", "-p", "2", "-d", testJarPath};
verifyCliFrontend(yarnCLI, parameters, 2, true, true);
}

// test detached mode
{
String[] parameters = {"-m", "yarn-cluster", "-yn", "1", "-p", "2", "-yd", testJarPath};
verifyCliFrontend(yarnCLI, parameters, 2, true, true);
}
}

private static class TestingFlinkYarnSessionCli extends FlinkYarnSessionCli {
@SuppressWarnings("unchecked")
private final ClusterClient<ApplicationId> clusterClient;
private final String configurationDirectory;

private TestingFlinkYarnSessionCli(
Configuration configuration,
String configurationDirectory,
String shortPrefix,
String longPrefix) throws Exception {
super(configuration, configurationDirectory, shortPrefix, longPrefix);

this.clusterClient = new FakeClusterClient(configuration);
this.configurationDirectory = configurationDirectory;
}

@Override
public AbstractYarnClusterDescriptor createClusterDescriptor(CommandLine commandLine)
throws FlinkException {
AbstractYarnClusterDescriptor parent = super.createClusterDescriptor(commandLine);
return new NonDeployingDetachedYarnClusterDescriptor(
parent.getFlinkConfiguration(),
(YarnConfiguration) parent.getYarnClient().getConfig(),
configurationDirectory,
parent.getYarnClient(),
clusterClient);
}
}

private static class NonDeployingDetachedYarnClusterDescriptor extends NonDeployingYarnClusterDescriptor {

NonDeployingDetachedYarnClusterDescriptor(
Configuration flinkConfiguration,
YarnConfiguration yarnConfiguration, String configurationDirectory,
YarnClient yarnClient,
ClusterClient<ApplicationId> clusterClient) {
super(flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient,
clusterClient);
}

@Override
public ClusterClient<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) {
assertTrue(detached);
return super.deployJobCluster(clusterSpecification, jobGraph, true);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn.util;

import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.jobgraph.JobGraph;

import org.apache.hadoop.yarn.api.records.ApplicationId;

import java.net.URL;
import java.util.List;

/**
* Dummy {@link ClusterClient} for testing purposes (extend as needed).
*/
public class FakeClusterClient extends ClusterClient<ApplicationId> {

public FakeClusterClient(Configuration flinkConfig) throws Exception {
super(flinkConfig);
}

@Override
public void waitForClusterToBeReady() {
}

@Override
public String getWebInterfaceURL() {
return "";
}

@Override
public GetClusterStatusResponse getClusterStatus() {
throw new UnsupportedOperationException("Not needed in test.");
}

@Override
public List<String> getNewMessages() {
throw new UnsupportedOperationException("Not needed in test.");
}

@Override
public ApplicationId getClusterId() {
throw new UnsupportedOperationException("Not needed in test.");
}

@Override
public int getMaxSlots() {
return 10;
}

@Override
public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
return false;
}

@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) {
throw new UnsupportedOperationException("Not needed in test.");
}
}
Loading

0 comments on commit fdd1c6e

Please sign in to comment.