Skip to content

Commit

Permalink
[FLINK-28733][scripts] jobmanager.sh supports dynamic parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Aug 5, 2022
1 parent f806bb7 commit ff2f4cb
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 15 deletions.
22 changes: 17 additions & 5 deletions flink-dist/src/main/flink-bin/bin/jobmanager.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,23 @@
################################################################################

# Start/stop a Flink JobManager.
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port] [args])|stop|stop-all"

STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instances

if [ -z $2 ] || [[ $2 == "-D" ]]; then
# start [-D ...]
args=("${@:2}")
elif [ -z $3 ] || [[ $3 == "-D" ]]; then
# start <host> [-D ...]
HOST=$2
args=("${@:3}")
else
# start <host> <port> [-D ...]
HOST=$2
WEBUIPORT=$3
args=("${@:4}")
fi

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
Expand All @@ -41,7 +53,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
parseJmArgsAndExportLogs "${ARGS[@]}"

args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster" "${args[@]}")
if [ ! -z $HOST ]; then
args+=("--host")
args+=("${HOST}")
Expand All @@ -53,7 +65,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
fi

if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
args+=(${DYNAMIC_PARAMETERS[@]})
args=(${DYNAMIC_PARAMETERS[@]} "${args[@]}")
fi
fi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand All @@ -70,7 +71,7 @@
import java.util.stream.Stream;

/** A wrapper around a Flink distribution. */
final class FlinkDistribution {
public final class FlinkDistribution {

private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class);

Expand All @@ -88,7 +89,7 @@ final class FlinkDistribution {

private final Configuration defaultConfig;

FlinkDistribution(Path distributionDir) {
public FlinkDistribution(Path distributionDir) {
bin = distributionDir.resolve("bin");
opt = distributionDir.resolve("opt");
lib = distributionDir.resolve("lib");
Expand All @@ -103,14 +104,33 @@ final class FlinkDistribution {

public void startJobManager() throws IOException {
LOG.info("Starting Flink JobManager.");
AutoClosableProcess.runBlocking(
bin.resolve("jobmanager.sh").toAbsolutePath().toString(), "start");
internalCallJobManagerScript("start");
}

public void callJobManagerScript(String... args) throws IOException {
LOG.info("Calling Flink JobManager script with {}.", Arrays.toString(args));
internalCallJobManagerScript(args);
}

private void internalCallJobManagerScript(String... args) throws IOException {
List<String> arguments = new ArrayList<>();
arguments.add(bin.resolve("jobmanager.sh").toAbsolutePath().toString());
arguments.addAll(Arrays.asList(args));
AutoClosableProcess.create(arguments.toArray(new String[0]))
// ignore the variable, we assume we log to the distribution directory
// and we copy the logs over in case of failure
.setEnv(env -> env.remove("FLINK_LOG_DIR"))
.runBlocking();
}

public void startTaskManager() throws IOException {
LOG.info("Starting Flink TaskManager.");
AutoClosableProcess.runBlocking(
bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start");
AutoClosableProcess.create(
bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start")
// ignore the variable, we assume we log to the distribution directory
// and we copy the logs over in case of failure
.setEnv(env -> env.remove("FLINK_LOG_DIR"))
.runBlocking();
}

public void startSqlGateway() throws IOException {
Expand Down Expand Up @@ -370,9 +390,9 @@ public void setTaskExecutorHosts(Collection<String> taskExecutorHosts) throws IO
Files.write(conf.resolve("workers"), taskExecutorHosts);
}

public Stream<String> searchAllLogs(Pattern pattern, Function<Matcher, String> matchProcessor)
public <T> Stream<T> searchAllLogs(Pattern pattern, Function<Matcher, T> matchProcessor)
throws IOException {
final List<String> matches = new ArrayList<>(2);
final List<T> matches = new ArrayList<>(2);

try (Stream<Path> logFilesStream = Files.list(log)) {
final Iterator<Path> logFiles = logFilesStream.iterator();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.dist;

import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.test.util.FileUtils;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.flink.FlinkDistribution;
import org.apache.flink.util.Preconditions;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

class DynamicParameterITCase {

private static final Pattern ENTRYPOINT_LOG_PATTERN =
Pattern.compile(".*ClusterEntrypoint +\\[] - +(.*)");
private static final Pattern ENTRYPOINT_CLASSPATH_LOG_PATTERN =
Pattern.compile(".*ClusterEntrypoint +\\[] - +Classpath:.*");

private static final String HOST = "localhost";
private static final int PORT = 8081;

private static final String DYNAMIC_KEY = "hello";
private static final String DYNAMIC_VALUE = "world";
private static final String DYNAMIC_PROPERTY = DYNAMIC_KEY + "=" + DYNAMIC_VALUE;

private static final Path originalDist = FileUtils.findFlinkDist();

private FlinkDistribution dist;

@BeforeEach
private void setup(@TempDir Path tmp) throws IOException {
TestUtils.copyDirectory(originalDist, tmp);
dist = new FlinkDistribution(tmp);
}

@AfterEach
private void cleanup() throws IOException {
if (dist != null) {
dist.stopFlinkCluster();
}
}

@Test
void testWithoutAnyParameter() throws Exception {
assertParameterPassing(dist, false, false, false);
}

@Test
void testWithHost() throws Exception {
assertParameterPassing(dist, true, false, false);
}

@Test
void testWithHostAndPort() throws Exception {
assertParameterPassing(dist, true, true, false);
}

@Test
void testWithDynamicParameter() throws Exception {
assertParameterPassing(dist, false, false, true);
}

@Test
void testWithDynamicParameterAndHost() throws Exception {
assertParameterPassing(dist, true, false, true);
}

@Test
void testWithDynamicParameterAndHostAndPort() throws Exception {
assertParameterPassing(dist, true, true, true);
}

private static void assertParameterPassing(
FlinkDistribution dist, boolean withHost, boolean withPort, boolean withDynamicProperty)
throws Exception {

final List<String> args = new ArrayList<>();
args.add("start");

if (withHost) {
args.add(HOST);
}
if (withPort) {
Preconditions.checkState(withHost, "port may only be supplied with a host");
args.add(String.valueOf(PORT));
}
if (withDynamicProperty) {
args.add("-D");
args.add(DYNAMIC_PROPERTY);
}

dist.callJobManagerScript(args.toArray(new String[0]));

while (!allProgramArgumentsLogged(dist)) {
Thread.sleep(500);
}

try (Stream<String> lines =
dist.searchAllLogs(ENTRYPOINT_LOG_PATTERN, matcher -> matcher.group(1))) {

final EntrypointClusterConfiguration entrypointConfig =
ClusterEntrypoint.parseArguments(
lines.filter(new ProgramArgumentsFilter()).toArray(String[]::new));

assertThat(entrypointConfig.getHostname()).isEqualTo(withHost ? HOST : null);
assertThat(entrypointConfig.getRestPort()).isEqualTo(withPort ? PORT : -1);

if (withDynamicProperty) {
assertThat(entrypointConfig.getDynamicProperties())
.containsEntry(DYNAMIC_KEY, DYNAMIC_VALUE);
} else {
assertThat(entrypointConfig.getDynamicProperties())
.doesNotContainEntry(DYNAMIC_KEY, DYNAMIC_VALUE);
}
} catch (FlinkParseException e) {
throw new RuntimeException(e);
}
}

private static boolean allProgramArgumentsLogged(FlinkDistribution dist) throws IOException {
// the classpath is logged after the program arguments
try (Stream<String> lines =
dist.searchAllLogs(ENTRYPOINT_CLASSPATH_LOG_PATTERN, matcher -> matcher.group(0))) {
return lines.iterator().hasNext();
}
}

private static class ProgramArgumentsFilter implements Predicate<String> {

private boolean inProgramArguments = false;

@Override
public boolean test(String s) {
if (s.contains("Program Arguments:")) {
inProgramArguments = true;
return false;
}
if (s.contains("Classpath:")) {
inProgramArguments = false;
}
return inProgramArguments;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ protected void cleanupDirectories(ShutdownBehaviour shutdownBehaviour) throws IO
protected abstract ExecutionGraphInfoStore createSerializableExecutionGraphStore(
Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException;

protected static EntrypointClusterConfiguration parseArguments(String[] args)
public static EntrypointClusterConfiguration parseArguments(String[] args)
throws FlinkParseException {
final CommandLineParser<EntrypointClusterConfiguration> clusterConfigurationParser =
new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());
Expand Down
2 changes: 1 addition & 1 deletion tools/ci/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
################################################################################

rootLogger.level = INFO
rootLogger.level = debug
rootLogger.appenderRef.out.ref = FileAppender

# -----------------------------------------------------------------------------
Expand Down

0 comments on commit ff2f4cb

Please sign in to comment.