Skip to content

Commit

Permalink
[FLINK-16742][runtime] Ignore unknown command line options for BashJa…
Browse files Browse the repository at this point in the history
…vaUtils.
  • Loading branch information
xintongsong authored and azagrebin committed Apr 27, 2020
1 parent 38b341b commit e7d68f1
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,19 @@
*/
public class ClusterConfigurationParserFactory implements ParserResultFactory<ClusterConfiguration> {

@Override
public Options getOptions() {
public static Options options() {
final Options options = new Options();
options.addOption(CONFIG_DIR_OPTION);
options.addOption(DYNAMIC_PROPERTY_OPTION);

return options;
}

@Override
public Options getOptions() {
return options();
}

@Override
public ClusterConfiguration createResult(@Nonnull CommandLine commandLine) {
final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
Expand All @@ -34,10 +32,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
Expand All @@ -55,6 +50,7 @@
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.MemoryLogger;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
Expand Down Expand Up @@ -292,20 +288,7 @@ public static void main(String[] args) throws Exception {
}

public static Configuration loadConfiguration(String[] args) throws FlinkParseException {
final CommandLineParser<ClusterConfiguration> commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory());

final ClusterConfiguration clusterConfiguration;

try {
clusterConfiguration = commandLineParser.parse(args);
} catch (FlinkParseException e) {
LOG.error("Could not parse the command line options.", e);
commandLineParser.printHelp(TaskManagerRunner.class.getSimpleName());
throw e;
}

final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties());
return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), dynamicProperties);
return ConfigurationParserUtils.loadCommonConfiguration(args, TaskManagerRunner.class.getSimpleName());
}

public static void runTaskManager(Configuration configuration, ResourceID resourceId, PluginManager pluginManager) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.util.FlinkException;

import org.apache.commons.cli.Options;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;

Expand Down Expand Up @@ -63,11 +69,39 @@ private static void getTmResourceParams(String[] args) throws Exception {
}

private static Configuration getConfigurationForStandaloneTaskManagers(String[] args) throws Exception {
Configuration configuration = TaskManagerRunner.loadConfiguration(args);
Configuration configuration = loadConfiguration(args);
return TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
configuration, TaskManagerOptions.TOTAL_FLINK_MEMORY);
}

@VisibleForTesting
static Configuration loadConfiguration(String[] args) throws FlinkException {
return ConfigurationParserUtils.loadCommonConfiguration(
filterCmdArgs(args),
BashJavaUtils.class.getSimpleName());
}

private static String[] filterCmdArgs(String[] args) {
final Options options = ClusterConfigurationParserFactory.options();
final List<String> filteredArgs = new ArrayList<>();
final Iterator<String> iter = Arrays.asList(args).iterator();

while (iter.hasNext()) {
String token = iter.next();
if (options.hasOption(token)) {
filteredArgs.add(token);
if (options.getOption(token).hasArg() && iter.hasNext()) {
filteredArgs.add(iter.next());
}
} else if (token.startsWith("-D")) {
// "-Dkey=value"
filteredArgs.add(token);
}
}

return filteredArgs.toArray(new String[0]);
}

/**
* Commands that BashJavaUtils supports.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,20 @@
package org.apache.flink.runtime.util;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.util.MathUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.flink.util.MathUtils.checkedDownCast;

/**
Expand All @@ -32,6 +41,8 @@
*/
public class ConfigurationParserUtils {

private static final Logger LOG = LoggerFactory.getLogger(ConfigurationParserUtils.class);

/**
* Parses the configuration to get the number of slots and validates the value.
*
Expand Down Expand Up @@ -95,4 +106,28 @@ public static int getPageSize(Configuration configuration) {

return pageSize;
}

/**
* Generate configuration from only the config file and dynamic properties.
* @param args the commandline arguments
* @param cmdLineSyntax the syntax for this application
* @return generated configuration
* @throws FlinkParseException if the configuration cannot be generated
*/
public static Configuration loadCommonConfiguration(String[] args, String cmdLineSyntax) throws FlinkParseException {
final CommandLineParser<ClusterConfiguration> commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory());

final ClusterConfiguration clusterConfiguration;

try {
clusterConfiguration = commandLineParser.parse(args);
} catch (FlinkParseException e) {
LOG.error("Could not parse the command line options.", e);
commandLineParser.printHelp(cmdLineSyntax);
throw e;
}

final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties());
return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), dynamicProperties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.util;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.FileWriter;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

/**
* Tests for {@link BashJavaUtils}.
*/
public class BashJavaUtilsTest extends TestLogger {

private static final String TEST_CONFIG_KEY = "test.key";
private static final String TEST_CONFIG_VALUE = "test_value";

@Rule
public TemporaryFolder confDir = new TemporaryFolder() {
@Override
protected void before() throws Throwable {
super.create();
File flinkConfFile = newFile("flink-conf.yaml");
FileWriter fw = new FileWriter(flinkConfFile);
fw.write(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE + "\n");
fw.close();
}
};

@Test
public void testLoadConfigurationConfigDirLongOpt() throws Exception {
String[] args = {"--configDir", confDir.getRoot().getAbsolutePath()};
Configuration configuration = BashJavaUtils.loadConfiguration(args);
verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
}

@Test
public void testLoadConfigurationConfigDirShortOpt() throws Exception {
String[] args = {"-c", confDir.getRoot().getAbsolutePath()};
Configuration configuration = BashJavaUtils.loadConfiguration(args);
verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
}

@Test
public void testLoadConfigurationDynamicPropertyWithSpace() throws Exception {
String[] args = {"--configDir", confDir.getRoot().getAbsolutePath(), "-D", "key=value"};
Configuration configuration = BashJavaUtils.loadConfiguration(args);
verifyConfiguration(configuration, "key", "value");
}

@Test
public void testLoadConfigurationDynamicPropertyWithoutSpace() throws Exception {
String[] args = {"--configDir", confDir.getRoot().getAbsolutePath(), "-Dkey=value"};
Configuration configuration = BashJavaUtils.loadConfiguration(args);
verifyConfiguration(configuration, "key", "value");
}

@Test
public void testLoadConfigurationIgnoreUnknownToken() throws Exception {
String [] args = {"unknown", "-u", "--configDir", confDir.getRoot().getAbsolutePath(), "--unknown", "-Dkey=value"};
Configuration configuration = BashJavaUtils.loadConfiguration(args);
verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
verifyConfiguration(configuration, "key", "value");
}

private void verifyConfiguration(Configuration config, String key, String expectedValue) {
ConfigOption<String> option = key(key).stringType().noDefaultValue();
assertThat(config.get(option), is(expectedValue));
}
}

0 comments on commit e7d68f1

Please sign in to comment.