From 8f467c1e9727d5a86d38d0b49753c534a1a161da Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 9 Jul 2018 23:54:55 +0200 Subject: [PATCH] [FLINK-9488] Add container entry point StandaloneJobClusterEntryPoint The StandaloneJobClusterEntryPoint is the basic entry point for containers. It is started with the user code jar in its classpath and the classname of the user program. The entrypoint will then load this user program via the classname and execute its main method. This will generate a JobGraph which is then used to start the MiniDispatcher. This closes #6315. --- flink-container/pom.xml | 69 ++++++++ .../StandaloneJobClusterConfiguration.java | 43 +++++ ...eJobClusterConfigurationParserFactory.java | 75 +++++++++ .../StandaloneJobClusterEntryPoint.java | 156 ++++++++++++++++++ .../src/main/resources/log4j.properties | 27 +++ ...ClusterConfigurationParserFactoryTest.java | 84 ++++++++++ .../StandaloneJobClusterEntryPointTest.java | 53 ++++++ .../flink/container/entrypoint/TestJob.java | 40 +++++ .../src/test/resources/log4j-test.properties | 32 ++++ ...ClusterConfigurationParserFactoryTest.java | 1 - pom.xml | 1 + 11 files changed, 580 insertions(+), 1 deletion(-) create mode 100644 flink-container/pom.xml create mode 100644 flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java create mode 100644 flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java create mode 100644 flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java create mode 100644 flink-container/src/main/resources/log4j.properties create mode 100644 flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java create mode 100644 flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java create mode 100644 flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java create mode 100644 flink-container/src/test/resources/log4j-test.properties diff --git a/flink-container/pom.xml b/flink-container/pom.xml new file mode 100644 index 0000000000000..b20d32117037d --- /dev/null +++ b/flink-container/pom.xml @@ -0,0 +1,69 @@ + + + + 4.0.0 + + + org.apache.flink + flink-parent + 1.6-SNAPSHOT + .. + + + flink-container_${scala.binary.version} + flink-container + jar + + + + + + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} + provided + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + provided + + + + + + org.apache.flink + flink-test-utils-junit + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + test + + + + diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java new file mode 100644 index 0000000000000..e68e74b80a402 --- /dev/null +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration; + +import javax.annotation.Nonnull; + +import java.util.Properties; + +/** + * Configuration for the {@link StandaloneJobClusterEntryPoint}. + */ +final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration { + @Nonnull + private final String jobClassName; + + public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName) { + super(configDir, dynamicProperties, args, restPort); + this.jobClassName = jobClassName; + } + + @Nonnull + String getJobClassName() { + return jobClassName; + } +} diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java new file mode 100644 index 0000000000000..c0cb473972594 --- /dev/null +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import javax.annotation.Nonnull; + +import java.util.Properties; + +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION; + +/** + * Parser factory which generates a {@link StandaloneJobClusterConfiguration} from a given + * list of command line arguments. + */ +public class StandaloneJobClusterConfigurationParserFactory implements ParserResultFactory { + + private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j") + .longOpt("job-classname") + .required(true) + .hasArg(true) + .argName("job class name") + .desc("Class name of the job to run.") + .build(); + + @Override + public Options getOptions() { + final Options options = new Options(); + options.addOption(CONFIG_DIR_OPTION); + options.addOption(REST_PORT_OPTION); + options.addOption(JOB_CLASS_NAME_OPTION); + options.addOption(DYNAMIC_PROPERTY_OPTION); + + return options; + } + + @Override + public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine commandLine) { + final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt()); + final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt()); + final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1"); + final int restPort = Integer.parseInt(restPortString); + final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt()); + + return new StandaloneJobClusterConfiguration( + configDir, + dynamicProperties, + commandLine.getArgs(), + restPort, + jobClassName); + } +} diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java new file mode 100644 index 0000000000000..47cca4c7d8509 --- /dev/null +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.FlinkException; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +/** + * {@link JobClusterEntrypoint} which is started with a job in a predefined + * location. + */ +public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { + + private static final String[] EMPTY_ARGS = new String[0]; + + @Nonnull + private final String jobClassName; + + StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName) { + super(configuration); + this.jobClassName = jobClassName; + } + + @Override + protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException { + final PackagedProgram packagedProgram = createPackagedProgram(); + final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); + try { + final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism); + jobGraph.setAllowQueuedScheduling(true); + + return jobGraph; + } catch (Exception e) { + throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e); + } + } + + private PackagedProgram createPackagedProgram() throws FlinkException { + try { + final Class mainClass = getClass().getClassLoader().loadClass(jobClassName); + return new PackagedProgram(mainClass, EMPTY_ARGS); + } catch (ClassNotFoundException | ProgramInvocationException e) { + throw new FlinkException("Could not load the provied entrypoint class.", e); + } + } + + @Override + protected void registerShutdownActions(CompletableFuture terminationFuture) { + terminationFuture.thenAccept((status) -> shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true)); + } + + @Override + protected ResourceManager createResourceManager( + Configuration configuration, + ResourceID resourceId, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + FatalErrorHandler fatalErrorHandler, + ClusterInformation clusterInformation, + @Nullable String webInterfaceUrl) throws Exception { + final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration); + final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); + final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( + resourceManagerRuntimeServicesConfiguration, + highAvailabilityServices, + rpcService.getScheduledExecutor()); + + return new StandaloneResourceManager( + rpcService, + ResourceManager.RESOURCE_MANAGER_NAME, + resourceId, + resourceManagerConfiguration, + highAvailabilityServices, + heartbeatServices, + resourceManagerRuntimeServices.getSlotManager(), + metricRegistry, + resourceManagerRuntimeServices.getJobLeaderIdService(), + clusterInformation, + fatalErrorHandler); + } + + public static void main(String[] args) { + // startup checks and logging + EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneJobClusterEntryPoint.class.getSimpleName(), args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory()); + StandaloneJobClusterConfiguration clusterConfiguration = null; + + try { + clusterConfiguration = commandLineParser.parse(args); + } catch (FlinkParseException e) { + LOG.error("Could not parse command line arguments {}.", args, e); + commandLineParser.printHelp(); + System.exit(1); + } + + Configuration configuration = loadConfiguration(clusterConfiguration); + + configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString()); + + StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, clusterConfiguration.getJobClassName()); + + entrypoint.startCluster(); + } + +} diff --git a/flink-container/src/main/resources/log4j.properties b/flink-container/src/main/resources/log4j.properties new file mode 100644 index 0000000000000..62cb6ed9cca57 --- /dev/null +++ b/flink-container/src/main/resources/log4j.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# Convenience file for local debugging of the JobManager/TaskManager. +log4j.rootLogger=OFF, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +log4j.logger.org.apache.flink.mesos=DEBUG +log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java new file mode 100644 index 0000000000000..1f39a0609e702 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link StandaloneJobClusterConfigurationParserFactory}. + */ +public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogger { + + private static final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory()); + + @Test + public void testEntrypointClusterConfigurationParsing() throws FlinkParseException { + final String configDir = "/foo/bar"; + final String key = "key"; + final String value = "value"; + final int restPort = 1234; + final String jobClassName = "foobar"; + final String arg1 = "arg1"; + final String arg2 = "arg2"; + final String[] args = {"--configDir", configDir, "--webui-port", String.valueOf(restPort), "--job-classname", jobClassName, String.format("-D%s=%s", key, value), arg1, arg2}; + + final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName))); + assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort))); + final Properties dynamicProperties = clusterConfiguration.getDynamicProperties(); + + assertThat(dynamicProperties, hasEntry(key, value)); + + assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2)); + } + + @Test + public void testOnlyRequiredArguments() throws FlinkParseException { + final String configDir = "/foo/bar"; + final String jobClassName = "foobar"; + final String[] args = {"--configDir", configDir, "--job-classname", jobClassName}; + + final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args); + + assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); + assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName))); + assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1))); + } + + @Test(expected = FlinkParseException.class) + public void testMissingRequiredArgument() throws FlinkParseException { + final String[] args = {}; + + commandLineParser.parse(args); + } +} diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java new file mode 100644 index 0000000000000..360799d19a823 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link StandaloneJobClusterEntryPoint}. + */ +public class StandaloneJobClusterEntryPointTest extends TestLogger { + + @Test + public void testJobGraphRetrieval() throws FlinkException { + final Configuration configuration = new Configuration(); + final int parallelism = 42; + configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism); + final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint( + configuration, + TestJob.class.getCanonicalName()); + + final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration); + + assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName()))); + assertThat(jobGraph.getMaximumParallelism(), is(parallelism)); + } + +} diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java new file mode 100644 index 0000000000000..5f8857fc35fb4 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; + +/** + * Test job which is used for {@link StandaloneJobClusterEntryPointTest}. + */ +public class TestJob { + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStreamSource source = env.fromElements(1, 2, 3, 4); + final SingleOutputStreamOperator mapper = source.map(element -> 2 * element); + mapper.addSink(new DiscardingSink<>()); + + env.execute(TestJob.class.getCanonicalName()); + } +} diff --git a/flink-container/src/test/resources/log4j-test.properties b/flink-container/src/test/resources/log4j-test.properties new file mode 100644 index 0000000000000..b85f2f20239bf --- /dev/null +++ b/flink-container/src/test/resources/log4j-test.properties @@ -0,0 +1,32 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console + +# log whats going on between the tests +log4j.logger.org.apache.flink.runtime.leaderelection=OFF +log4j.logger.org.apache.flink.runtime.leaderretrieval=OFF + diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java index 7447439b26300..62da39e592550 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java @@ -73,5 +73,4 @@ public void testMissingRequiredArgument() throws FlinkParseException { commandLineParser.parse(args); } - } diff --git a/pom.xml b/pom.xml index 897ae3cf9e74a..1f35cd44892ce 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,7 @@ under the License. flink-formats flink-examples flink-clients + flink-container flink-queryable-state flink-tests flink-end-to-end-tests