Skip to content

Commit

Permalink
[FLINK-1629][FLINK-1630][FLINK-1547] Add option to start Flink on YAR…
Browse files Browse the repository at this point in the history
…N in a detached mode. YARN container reallocation.

This commit is changing:
[FLINK-1629]: users can now "fire and forget" jobs to YARN or YARN sessions to there. (Detached mode)
[FLINK-1630]: YARN is now reallocating failed YARN containers during the lifetime of a YARN session.
[FLINK-1547]: Users can now specify if they want the ApplicationMaster (= the JobManager = the entire YARN session) to restart on failure, and how often. After the first restart, the session will behave like a detached session. There is now backup of state between the old and the new AM.

The whole resource negotiation process between the RM and the AM has been reworked.
Flink is now much more flexible when requesting new containers and also giving back uneeded containers.

A new test case is testing the container restart. It is also verifying that the web frontend is proplery started,
that the logfile access is possible and
that the configuration values the user specifies when starting the YARN session are visible in the web frontend.

This closes apache#468
  • Loading branch information
rmetzger committed Mar 12, 2015
1 parent fd9ca4d commit 13bb21b
Show file tree
Hide file tree
Showing 24 changed files with 1,291 additions and 510 deletions.
51 changes: 38 additions & 13 deletions docs/yarn_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,30 @@ under the License.
* This will be replaced by the TOC
{:toc}

## In a Nutshell
## Quickstart: Start a long-running Flink cluster on YARN

Start YARN session with 4 Task Managers (each with 4 GB of Heapspace):
Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace):

~~~bash
wget {{ site.FLINK_WGET_URL_YARN_STABLE }}
tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2-yarn.tgz
cd flink-yarn-{{ site.FLINK_VERSION_SHORT }}/
tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2.tgz
cd flink-{{ site.FLINK_VERSION_SHORT }}/
./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
~~~

Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.

Once the session has been started, you can submit jobs to the cluster using the `./bin/flink` tool.

## Quickstart: Run a Flink job on YARN

~~~bash
wget {{ site.FLINK_WGET_URL_YARN_STABLE }}
tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2.tgz
cd flink-{{ site.FLINK_VERSION_SHORT }}/
./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/flink-java-examples-{{ site.FLINK_VERSION_SHORT }}-WordCount.jar
~~~

## Apache Flink on Hadoop YARN using a YARN Session

Apache [Hadoop YARN](http:https://hadoop.apache.org/) is a cluster resource management framework. It allows to run various distributed applications on top of a cluster. Flink runs on YARN next to other applications. Users do not have to setup or install anything if there is already a YARN setup.
Expand All @@ -60,11 +71,11 @@ Download the YARN tgz package on the [download page]({{site.baseurl}}/downloads.
Extract the package using:

~~~bash
tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2-yarn.tgz
cd flink-yarn-{{site.FLINK_VERSION_SHORT }}/
tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2.tgz
cd flink-{{site.FLINK_VERSION_SHORT }}/
~~~

If you want to build the YARN .tgz file from sources, follow the [build instructions](building.html). You can find the result of the build in `flink-dist/target/flink-{{ site.FLINK_VERSION_SHORT }}-bin/flink-yarn-{{ site.FLINK_VERSION_SHORT }}/` (*Note: The version might be different for you* ).
If you want to build the YARN .tgz file from sources, follow the [build instructions](building.html). You can find the result of the build in `flink-dist/target/flink-{{ site.FLINK_VERSION_SHORT }}-bin/flink-{{ site.FLINK_VERSION_SHORT }}/` (*Note: The version might be different for you* ).


#### Start a Session
Expand All @@ -83,6 +94,7 @@ Usage:
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container [in MB]
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
Expand All @@ -102,18 +114,23 @@ The system will use the configuration in `conf/flink-config.yaml`. Please follow

Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelization.degree.default` if the number of slots has been specified.

If you don't want to change the configuration file to pass configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368`.
If you don't want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368`.

The example invocation starts 11 containers, since there is one additional container for the ApplicationMaster and Job Manager.

Once Flink is deployed in your YARN cluster, it will show you the connection details of the Job Manager.

The client has to remain open to keep the deployment running. We suggest to use `screen`, which will start a detachable shell:
Stop the YARN session by stopping the unix process (using CTRL+C) or by entering 'stop' into the client.

1. Open `screen`,
2. Start Flink on YARN,
3. Use `CTRL+a`, then press `d` to detach the screen session,
4. Use `screen -r` to resume again.
#### Detached YARN session

If you do not want to keep the Flink YARN client running all the time, its also possible to start a *detached* YARN session.
The parameter for that is called `-d` or `--detached`.

In that case, the Flink YARN client will only submit Flink to the cluster and then close itself.
Note that in this case its not possible to stop the YARN session using Flink.

Use the YARN utilities (`yarn application -kill <appId`) to stop the YARN session.


### Submit Job to Flink
Expand Down Expand Up @@ -187,6 +204,14 @@ Please note that the client then expects the `-yn` value to be set (number of Ta
The command line options of the YARN session are also available with the `./bin/flink` tool. They are prefixed with a `y` or `yarn` (for the long argument options).


## Recovery behavior of Flink on YARN

Flink's YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters.
- `yarn.reallocate-failed`: This parameter controls whether Flink should reallocate failed TaskManager containers. Default: true
- `yarn.maximum-failed-containers`: The maximum number of failed containers the ApplicationMaster accepts until it fails the YARN session. Default: The number of initally requested TaskManagers (`-n`).
- `yarn.application-attempts`: The number of ApplicationMaster (+ its TaskManager containers) attempts. If this value is set to 1 (default), the entire YARN session will fail when the Application master fails. Higher values specify the number of restarts of the ApplicationMaster by YARN.
## Debugging a failed YARN session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class FlinkYarnSessionCli {
public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";


private static final int CLIENT_POLLING_INTERVALL = 3;


Expand All @@ -73,14 +72,17 @@ public class FlinkYarnSessionCli {
private final Option TM_MEMORY;
private final Option CONTAINER;
private final Option SLOTS;
private final Option DETACHED;

/**
* Dynamic properties allow the user to specify additional configuration values with -D, such as
* -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
*/
private final Option DYNAMIC_PROPERTIES;

//------------------------------------ Internal fields -------------------------
private AbstractFlinkYarnCluster yarnCluster = null;
private boolean detachedMode = false;

public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
Expand All @@ -92,6 +94,7 @@ public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
}

public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
Expand Down Expand Up @@ -212,6 +215,10 @@ public boolean accept(File dir, String name) {

flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);

if(cmd.hasOption(DETACHED.getOpt())) {
detachedMode = true;
flinkYarnClient.setDetachedMode(detachedMode);
}
return flinkYarnClient;
}

Expand All @@ -234,6 +241,7 @@ private void printUsage() {
opt.addOption(QUEUE);
opt.addOption(SLOTS);
opt.addOption(DYNAMIC_PROPERTIES);
opt.addOption(DETACHED);
formatter.printHelp(" ", opt);
}

Expand Down Expand Up @@ -289,6 +297,7 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {

if(yarnCluster.hasFailed()) {
System.err.println("The YARN cluster has failed");
yarnCluster.shutdown();
}

// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
Expand Down Expand Up @@ -335,6 +344,7 @@ public void getYARNSessionCLIOptions(Options options) {
options.addOption(SHIP_PATH);
options.addOption(SLOTS);
options.addOption(DYNAMIC_PROPERTIES);
options.addOption(DETACHED);
}

public int run(String[] args) {
Expand Down Expand Up @@ -405,17 +415,25 @@ public int run(String[] args) {

//------------------ Cluster running, let user control it ------------

runInteractiveCli(yarnCluster);
if(detachedMode) {
// print info and quit:
LOG.info("The Flink YARN client has been started in detached mode. In order to stop" +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
"Please also note that the temporary files of the YARN session in {} will not be removed.", flinkYarnClient.getSessionFilesDir());
} else {
runInteractiveCli(yarnCluster);

if(!yarnCluster.hasBeenStopped()) {
LOG.info("Command Line Interface requested session shutdown");
yarnCluster.shutdown();
}
if (!yarnCluster.hasBeenStopped()) {
LOG.info("Command Line Interface requested session shutdown");
yarnCluster.shutdown();
}

try {
yarnPropertiesFile.delete();
} catch (Exception e) {
LOG.warn("Exception while deleting the JobManager address file", e);
try {
yarnPropertiesFile.delete();
} catch (Exception e) {
LOG.warn("Exception while deleting the JobManager address file", e);
}
}
}
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,35 @@ public final class ConfigConstants {
*/
public static final String YARN_HEAP_LIMIT_CAP = "yarn.heap-limit-cap";

/**
* Reallocate failed YARN containers.
*/
public static final String YARN_REALLOCATE_FAILED_CONTAINERS = "yarn.reallocate-failed";

/**
* The maximum number of failed YARN containers before entirely stopping
* the YARN session / job on YARN.
*
* By default, we take the number of of initially requested containers.
*/
public static final String YARN_MAX_FAILED_CONTAINERS = "yarn.maximum-failed-containers";

/**
* Set the number of retries for failed YARN ApplicationMasters/JobManagers.
* This value is usually limited by YARN.
*
* By default, its 1.
*/
public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts";

/**
* The heartbeat intervall between the Application Master and the YARN Resource Manager.
*
* The default value is 5 (seconds).
*/
public static final String YARN_HEARTBEAT_DELAY_SECONDS = "yarn.heartbeat-delay";


// ------------------------ Hadoop Configuration ------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import akka.actor.ActorRef;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,9 +65,9 @@ public class WebInfoServer {
private final Server server;

/**
* Port for info server
* The assigned port where jetty is running.
*/
private final int port;
private int assignedPort;

/**
* Creates a new web info server. The server runs the servlets that implement the logic
Expand All @@ -87,10 +88,11 @@ public WebInfoServer(Configuration config, ActorRef jobmanager, ActorRef archive
throw new NullPointerException();
}

this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
// if port == 0, jetty will assign an available port.
int port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
if (this.port <= 0) {
throw new IllegalArgumentException("Invalid port for the webserver: " + this.port);
if (port < 0) {
throw new IllegalArgumentException("Invalid port for the webserver: " + port);
}

final FiniteDuration timeout = AkkaUtils.getTimeout(config);
Expand Down Expand Up @@ -190,17 +192,24 @@ public WebInfoServer(Configuration config, ActorRef jobmanager, ActorRef archive
*/
public void start() throws Exception {
server.start();
LOG.info("Started web info server for JobManager on {}:{}",server.getConnectors()[0].getHost(), this.port);
final Connector connector = server.getConnectors()[0];
assignedPort = connector.getLocalPort(); // we have to use getLocalPort() instead of getPort() http:https://stackoverflow.com/questions/8884865/how-to-discover-jetty-7-running-port
String host = connector.getHost();
if(host == null) { // as per method documentation
host = "0.0.0.0";
}
LOG.info("Started web info server for JobManager on {}:{}", host, assignedPort);
}

/**
* Stop the webserver
*/
public void stop() throws Exception {
server.stop();
assignedPort = 0;
}

public int getServerPort() {
return this.port;
return this.assignedPort;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.util;

import org.slf4j.Logger;
import sun.misc.Signal;

/**
* This signal handler / signal logger is based on Apache Hadoops org.apache.hadoop.util.SignalLogger.
*/
public class SignalHandler {
private static boolean registered = false;

/**
* Our signal handler.
*/
private static class Handler implements sun.misc.SignalHandler {
final private Logger LOG;
final private sun.misc.SignalHandler prevHandler;

Handler(String name, Logger LOG) {
this.LOG = LOG;
prevHandler = Signal.handle(new Signal(name), this);
}

/**
* Handle an incoming signal.
*
* @param signal The incoming signal
*/
@Override
public void handle(Signal signal) {
LOG.error("RECEIVED SIGNAL " + signal.getNumber() + ": SIG" + signal.getName());
prevHandler.handle(signal);
}
}

/**
* Register some signal handlers.
*
* @param LOG The slf4j logger
*/
public static void register(final Logger LOG) {
if (registered) {
throw new IllegalStateException("Can't re-install the signal handlers.");
}
registered = true;
StringBuilder bld = new StringBuilder();
bld.append("registered UNIX signal handlers for [");
final String[] SIGNALS = { "TERM", "HUP", "INT" };
String separator = "";
for (String signalName : SIGNALS) {
try {
new Handler(signalName, LOG);
bld.append(separator);
bld.append(signalName);
separator = ", ";
} catch (Exception e) {
LOG.debug("Error while registering signal handler", e);
}
}
bld.append("]");
LOG.info(bld.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,7 @@ public abstract class AbstractFlinkYarnClient {

public abstract AbstractFlinkYarnCluster deploy(String clusterName) throws Exception;

public abstract void setDetachedMode(boolean detachedMode);

public abstract String getSessionFilesDir();
}
Loading

0 comments on commit 13bb21b

Please sign in to comment.