Skip to content

Commit

Permalink
[FLINK-16742][runtime] Refactor BashJavaUtils, move to separated pack…
Browse files Browse the repository at this point in the history
…age and extract configuration loading logics.
  • Loading branch information
xintongsong authored and azagrebin committed Apr 27, 2020
1 parent a7923fd commit 0f5132d
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 50 deletions.
4 changes: 2 additions & 2 deletions flink-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ under the License.
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>org/apache/flink/runtime/util/BashJavaUtils.class</exclude>
<exclude>org/apache/flink/runtime/util/bash/BashJavaUtils.class</exclude>
</excludes>
</filter>
</filters>
Expand Down Expand Up @@ -769,7 +769,7 @@ under the License.
<filter>
<artifact>org.apache.flink:*</artifact>
<includes>
<include>org/apache/flink/runtime/util/BashJavaUtils.class</include>
<include>org/apache/flink/runtime/util/bash/BashJavaUtils.class</include>
</includes>
</filter>
</filters>
Expand Down
2 changes: 1 addition & 1 deletion flink-dist/src/main/flink-bin/bin/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ runBashJavaUtilsCmd() {
local dynamic_args=${@:4}
class_path=`manglePathList ${class_path}`

local output=`${JAVA_RUN} -classpath ${class_path} org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir ${conf_dir} $dynamic_args 2>&1 | tail -n 1000`
local output=`${JAVA_RUN} -classpath ${class_path} org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir ${conf_dir} $dynamic_args 2>&1 | tail -n 1000`
if [[ $? -ne 0 ]]; then
echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
# Print the output in case the user redirect the log to console.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.util.BashJavaUtils;
import org.apache.flink.runtime.util.bash.BashJavaUtils;

import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,19 @@
* limitations under the License.
*/

package org.apache.flink.runtime.util;
package org.apache.flink.runtime.util.bash;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.util.FlinkException;

import org.apache.commons.cli.Options;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;

Expand Down Expand Up @@ -77,7 +70,7 @@ private static void getTmResourceParams(String[] args) throws Exception {
}

private static Configuration getConfigurationForStandaloneTaskManagers(String[] args) throws Exception {
Configuration configuration = loadConfiguration(args);
Configuration configuration = FlinkConfigLoader.loadConfiguration(args);
return TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
configuration, TaskManagerOptions.TOTAL_FLINK_MEMORY);
}
Expand All @@ -89,39 +82,11 @@ private static void getJmResourceParams(String[] args) throws Exception {
}

private static Configuration getConfigurationForStandaloneJobManager(String[] args) throws Exception {
Configuration configuration = loadConfiguration(args);
Configuration configuration = FlinkConfigLoader.loadConfiguration(args);
return JobManagerProcessUtils.getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(
configuration, JobManagerOptions.TOTAL_FLINK_MEMORY);
}

@VisibleForTesting
static Configuration loadConfiguration(String[] args) throws FlinkException {
return ConfigurationParserUtils.loadCommonConfiguration(
filterCmdArgs(args),
BashJavaUtils.class.getSimpleName());
}

private static String[] filterCmdArgs(String[] args) {
final Options options = ClusterConfigurationParserFactory.options();
final List<String> filteredArgs = new ArrayList<>();
final Iterator<String> iter = Arrays.asList(args).iterator();

while (iter.hasNext()) {
String token = iter.next();
if (options.hasOption(token)) {
filteredArgs.add(token);
if (options.getOption(token).hasArg() && iter.hasNext()) {
filteredArgs.add(iter.next());
}
} else if (token.startsWith("-D")) {
// "-Dkey=value"
filteredArgs.add(token);
}
}

return filteredArgs.toArray(new String[0]);
}

/**
* Commands that BashJavaUtils supports.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.util.bash;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.FlinkException;

import org.apache.commons.cli.Options;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
* Util class for loading configuration from commandline arguments.
* It parses only the configuration file and dynamic properties, ignores other commandline options.
*/
public class FlinkConfigLoader {

private static final Options CMD_OPTIONS = ClusterConfigurationParserFactory.options();

public static Configuration loadConfiguration(String[] args) throws FlinkException {
return ConfigurationParserUtils.loadCommonConfiguration(
filterCmdArgs(args),
BashJavaUtils.class.getSimpleName());
}

private static String[] filterCmdArgs(String[] args) {
final List<String> filteredArgs = new ArrayList<>();
final Iterator<String> iter = Arrays.asList(args).iterator();

while (iter.hasNext()) {
String token = iter.next();
if (CMD_OPTIONS.hasOption(token)) {
filteredArgs.add(token);
if (CMD_OPTIONS.getOption(token).hasArg() && iter.hasNext()) {
filteredArgs.add(iter.next());
}
} else if (token.startsWith("-D")) {
// "-Dkey=value"
filteredArgs.add(token);
}
}

return filteredArgs.toArray(new String[0]);
}

private FlinkConfigLoader() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.util;
package org.apache.flink.runtime.util.bash;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
Expand All @@ -34,9 +34,9 @@
import static org.hamcrest.MatcherAssert.assertThat;

/**
* Tests for {@link BashJavaUtils}.
* Tests for {@link FlinkConfigLoader}.
*/
public class BashJavaUtilsTest extends TestLogger {
public class FlinkConfigLoaderTest extends TestLogger {

private static final String TEST_CONFIG_KEY = "test.key";
private static final String TEST_CONFIG_VALUE = "test_value";
Expand All @@ -56,35 +56,35 @@ protected void before() throws Throwable {
@Test
public void testLoadConfigurationConfigDirLongOpt() throws Exception {
String[] args = {"--configDir", confDir.getRoot().getAbsolutePath()};
Configuration configuration = BashJavaUtils.loadConfiguration(args);
Configuration configuration = FlinkConfigLoader.loadConfiguration(args);
verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
}

@Test
public void testLoadConfigurationConfigDirShortOpt() throws Exception {
String[] args = {"-c", confDir.getRoot().getAbsolutePath()};
Configuration configuration = BashJavaUtils.loadConfiguration(args);
Configuration configuration = FlinkConfigLoader.loadConfiguration(args);
verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
}

@Test
public void testLoadConfigurationDynamicPropertyWithSpace() throws Exception {
String[] args = {"--configDir", confDir.getRoot().getAbsolutePath(), "-D", "key=value"};
Configuration configuration = BashJavaUtils.loadConfiguration(args);
Configuration configuration = FlinkConfigLoader.loadConfiguration(args);
verifyConfiguration(configuration, "key", "value");
}

@Test
public void testLoadConfigurationDynamicPropertyWithoutSpace() throws Exception {
String[] args = {"--configDir", confDir.getRoot().getAbsolutePath(), "-Dkey=value"};
Configuration configuration = BashJavaUtils.loadConfiguration(args);
Configuration configuration = FlinkConfigLoader.loadConfiguration(args);
verifyConfiguration(configuration, "key", "value");
}

@Test
public void testLoadConfigurationIgnoreUnknownToken() throws Exception {
String [] args = {"unknown", "-u", "--configDir", confDir.getRoot().getAbsolutePath(), "--unknown", "-Dkey=value"};
Configuration configuration = BashJavaUtils.loadConfiguration(args);
Configuration configuration = FlinkConfigLoader.loadConfiguration(args);
verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
verifyConfiguration(configuration, "key", "value");
}
Expand Down

0 comments on commit 0f5132d

Please sign in to comment.