diff --git a/flink-container/pom.xml b/flink-container/pom.xml
new file mode 100644
index 0000000000000..b20d32117037d
--- /dev/null
+++ b/flink-container/pom.xml
@@ -0,0 +1,69 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-parent
+ 1.6-SNAPSHOT
+ ..
+
+
+ flink-container_${scala.binary.version}
+ flink-container
+ jar
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-runtime_${scala.binary.version}
+ ${project.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${project.version}
+ provided
+
+
+
+
+
+ org.apache.flink
+ flink-test-utils-junit
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${project.version}
+ test
+
+
+
+
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
new file mode 100644
index 0000000000000..e68e74b80a402
--- /dev/null
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
+
+import javax.annotation.Nonnull;
+
+import java.util.Properties;
+
+/**
+ * Configuration for the {@link StandaloneJobClusterEntryPoint}.
+ */
+final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration {
+ @Nonnull
+ private final String jobClassName;
+
+ public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName) {
+ super(configDir, dynamicProperties, args, restPort);
+ this.jobClassName = jobClassName;
+ }
+
+ @Nonnull
+ String getJobClassName() {
+ return jobClassName;
+ }
+}
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
new file mode 100644
index 0000000000000..c0cb473972594
--- /dev/null
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import javax.annotation.Nonnull;
+
+import java.util.Properties;
+
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
+
+/**
+ * Parser factory which generates a {@link StandaloneJobClusterConfiguration} from a given
+ * list of command line arguments.
+ */
+public class StandaloneJobClusterConfigurationParserFactory implements ParserResultFactory {
+
+ private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j")
+ .longOpt("job-classname")
+ .required(true)
+ .hasArg(true)
+ .argName("job class name")
+ .desc("Class name of the job to run.")
+ .build();
+
+ @Override
+ public Options getOptions() {
+ final Options options = new Options();
+ options.addOption(CONFIG_DIR_OPTION);
+ options.addOption(REST_PORT_OPTION);
+ options.addOption(JOB_CLASS_NAME_OPTION);
+ options.addOption(DYNAMIC_PROPERTY_OPTION);
+
+ return options;
+ }
+
+ @Override
+ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine commandLine) {
+ final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt());
+ final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
+ final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
+ final int restPort = Integer.parseInt(restPortString);
+ final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
+
+ return new StandaloneJobClusterConfiguration(
+ configDir,
+ dynamicProperties,
+ commandLine.getArgs(),
+ restPort,
+ jobClassName);
+ }
+}
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
new file mode 100644
index 0000000000000..47cca4c7d8509
--- /dev/null
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link JobClusterEntrypoint} which is started with a job in a predefined
+ * location.
+ */
+public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
+
+ private static final String[] EMPTY_ARGS = new String[0];
+
+ @Nonnull
+ private final String jobClassName;
+
+ StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName) {
+ super(configuration);
+ this.jobClassName = jobClassName;
+ }
+
+ @Override
+ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+ final PackagedProgram packagedProgram = createPackagedProgram();
+ final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+ try {
+ final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
+ jobGraph.setAllowQueuedScheduling(true);
+
+ return jobGraph;
+ } catch (Exception e) {
+ throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
+ }
+ }
+
+ private PackagedProgram createPackagedProgram() throws FlinkException {
+ try {
+ final Class> mainClass = getClass().getClassLoader().loadClass(jobClassName);
+ return new PackagedProgram(mainClass, EMPTY_ARGS);
+ } catch (ClassNotFoundException | ProgramInvocationException e) {
+ throw new FlinkException("Could not load the provied entrypoint class.", e);
+ }
+ }
+
+ @Override
+ protected void registerShutdownActions(CompletableFuture terminationFuture) {
+ terminationFuture.thenAccept((status) -> shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true));
+ }
+
+ @Override
+ protected ResourceManager> createResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler,
+ ClusterInformation clusterInformation,
+ @Nullable String webInterfaceUrl) throws Exception {
+ final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
+ final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+ final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+ resourceManagerRuntimeServicesConfiguration,
+ highAvailabilityServices,
+ rpcService.getScheduledExecutor());
+
+ return new StandaloneResourceManager(
+ rpcService,
+ ResourceManager.RESOURCE_MANAGER_NAME,
+ resourceId,
+ resourceManagerConfiguration,
+ highAvailabilityServices,
+ heartbeatServices,
+ resourceManagerRuntimeServices.getSlotManager(),
+ metricRegistry,
+ resourceManagerRuntimeServices.getJobLeaderIdService(),
+ clusterInformation,
+ fatalErrorHandler);
+ }
+
+ public static void main(String[] args) {
+ // startup checks and logging
+ EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneJobClusterEntryPoint.class.getSimpleName(), args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
+ StandaloneJobClusterConfiguration clusterConfiguration = null;
+
+ try {
+ clusterConfiguration = commandLineParser.parse(args);
+ } catch (FlinkParseException e) {
+ LOG.error("Could not parse command line arguments {}.", args, e);
+ commandLineParser.printHelp();
+ System.exit(1);
+ }
+
+ Configuration configuration = loadConfiguration(clusterConfiguration);
+
+ configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString());
+
+ StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration, clusterConfiguration.getJobClassName());
+
+ entrypoint.startCluster();
+ }
+
+}
diff --git a/flink-container/src/main/resources/log4j.properties b/flink-container/src/main/resources/log4j.properties
new file mode 100644
index 0000000000000..62cb6ed9cca57
--- /dev/null
+++ b/flink-container/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+# Convenience file for local debugging of the JobManager/TaskManager.
+log4j.rootLogger=OFF, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+log4j.logger.org.apache.flink.mesos=DEBUG
+log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
new file mode 100644
index 0000000000000..1f39a0609e702
--- /dev/null
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link StandaloneJobClusterConfigurationParserFactory}.
+ */
+public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogger {
+
+ private static final CommandLineParser commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
+
+ @Test
+ public void testEntrypointClusterConfigurationParsing() throws FlinkParseException {
+ final String configDir = "/foo/bar";
+ final String key = "key";
+ final String value = "value";
+ final int restPort = 1234;
+ final String jobClassName = "foobar";
+ final String arg1 = "arg1";
+ final String arg2 = "arg2";
+ final String[] args = {"--configDir", configDir, "--webui-port", String.valueOf(restPort), "--job-classname", jobClassName, String.format("-D%s=%s", key, value), arg1, arg2};
+
+ final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
+
+ assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir)));
+ assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
+ assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort)));
+ final Properties dynamicProperties = clusterConfiguration.getDynamicProperties();
+
+ assertThat(dynamicProperties, hasEntry(key, value));
+
+ assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));
+ }
+
+ @Test
+ public void testOnlyRequiredArguments() throws FlinkParseException {
+ final String configDir = "/foo/bar";
+ final String jobClassName = "foobar";
+ final String[] args = {"--configDir", configDir, "--job-classname", jobClassName};
+
+ final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
+
+ assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir)));
+ assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
+ assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1)));
+ }
+
+ @Test(expected = FlinkParseException.class)
+ public void testMissingRequiredArgument() throws FlinkParseException {
+ final String[] args = {};
+
+ commandLineParser.parse(args);
+ }
+}
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
new file mode 100644
index 0000000000000..360799d19a823
--- /dev/null
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link StandaloneJobClusterEntryPoint}.
+ */
+public class StandaloneJobClusterEntryPointTest extends TestLogger {
+
+ @Test
+ public void testJobGraphRetrieval() throws FlinkException {
+ final Configuration configuration = new Configuration();
+ final int parallelism = 42;
+ configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+ final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
+ configuration,
+ TestJob.class.getCanonicalName());
+
+ final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
+
+ assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName())));
+ assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
+ }
+
+}
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
new file mode 100644
index 0000000000000..5f8857fc35fb4
--- /dev/null
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+
+/**
+ * Test job which is used for {@link StandaloneJobClusterEntryPointTest}.
+ */
+public class TestJob {
+
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final DataStreamSource source = env.fromElements(1, 2, 3, 4);
+ final SingleOutputStreamOperator mapper = source.map(element -> 2 * element);
+ mapper.addSink(new DiscardingSink<>());
+
+ env.execute(TestJob.class.getCanonicalName());
+ }
+}
diff --git a/flink-container/src/test/resources/log4j-test.properties b/flink-container/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000000000..b85f2f20239bf
--- /dev/null
+++ b/flink-container/src/test/resources/log4j-test.properties
@@ -0,0 +1,32 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
+
+# log whats going on between the tests
+log4j.logger.org.apache.flink.runtime.leaderelection=OFF
+log4j.logger.org.apache.flink.runtime.leaderretrieval=OFF
+
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java
index 7447439b26300..62da39e592550 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterConfigurationParserFactoryTest.java
@@ -73,5 +73,4 @@ public void testMissingRequiredArgument() throws FlinkParseException {
commandLineParser.parse(args);
}
-
}
diff --git a/pom.xml b/pom.xml
index 897ae3cf9e74a..1f35cd44892ce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,6 +69,7 @@ under the License.
flink-formats
flink-examples
flink-clients
+ flink-container
flink-queryable-state
flink-tests
flink-end-to-end-tests