Skip to content

Commit

Permalink
[FLINK-3544][runtime] introduce ResourceManager component
Browse files Browse the repository at this point in the history
This closes apache#1741.
  • Loading branch information
mxm committed Mar 29, 2016
1 parent b21e63c commit 92ff2b1
Show file tree
Hide file tree
Showing 89 changed files with 4,967 additions and 442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
Expand All @@ -69,7 +70,6 @@
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -1050,10 +1050,11 @@ protected Client getClient(
logAndSysout("Waiting until all TaskManagers have connected");

while(true) {
FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
GetClusterStatusResponse status = yarnCluster.getClusterStatus();
if (status != null) {
if (status.getNumberOfTaskManagers() < flinkYarnClient.getTaskManagerCount()) {
logAndSysout("TaskManager status (" + status.getNumberOfTaskManagers() + "/" + flinkYarnClient.getTaskManagerCount() + ")");
if (status.numRegisteredTaskManagers() < flinkYarnClient.getTaskManagerCount()) {
logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
+ flinkYarnClient.getTaskManagerCount() + ")");
} else {
logAndSysout("All TaskManagers are connected");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
Expand Down Expand Up @@ -303,11 +303,12 @@ public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {
while (true) {
// ------------------ check if there are updates by the cluster -----------

FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
if (status != null && numTaskmanagers != status.getNumberOfTaskManagers()) {
GetClusterStatusResponse status = yarnCluster.getClusterStatus();
if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
System.err.println("Number of connected TaskManagers changed to " +
status.getNumberOfTaskManagers() + ". Slots available: " + status.getNumberOfSlots());
numTaskmanagers = status.getNumberOfTaskManagers();
status.numRegisteredTaskManagers() + ". " +
"Slots available: " + status.totalNumberOfSlots());
numTaskmanagers = status.numRegisteredTaskManagers();
}

List<String> messages = yarnCluster.getNewMessages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ public final class ConfigConstants {
*/
public static final String JOB_MANAGER_IPC_PORT_KEY = "jobmanager.rpc.port";

/**
* The config parameter defining the network port to connect to
* for communication with the resource manager.
*/
public static final String RESOURCE_MANAGER_IPC_PORT_KEY = "resourcemanager.rpc.port";

/**
* The config parameter defining the storage directory to be used by the blob server.
*/
Expand Down Expand Up @@ -214,7 +220,7 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";

// --------------------------- Runtime Algorithms -------------------------------
// ------------------------ Runtime Algorithms ----------------------------

/**
* Parameter for the maximum fan for out-of-core algorithms.
Expand All @@ -240,6 +246,37 @@ public final class ConfigConstants {
*/
public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";


// -------- Common Resource Framework Configuration (YARN & Mesos) --------

/**
* Percentage of heap space to remove from containers (YARN / Mesos), to compensate
* for other JVM memory usage.
*/
public static final String CONTAINERED_HEAP_CUTOFF_RATIO = "containered.heap-cutoff-ratio";

/**
* Minimum amount of heap memory to remove in containers, as a safety margin.
*/
public static final String CONTAINERED_HEAP_CUTOFF_MIN = "containered.heap-cutoff-min";

/**
* Prefix for passing custom environment variables to Flink's master process.
* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
* in the flink-conf.yaml.
*/
public static final String CONTAINERED_MASTER_ENV_PREFIX = "containered.application-master.env.";

/**
* Similar to the {@see CONTAINERED_MASTER_ENV_PREFIX}, this configuration prefix allows
* setting custom environment variables for the workers (TaskManagers)
*/
public static final String CONTAINERED_TASK_MANAGER_ENV_PREFIX = "containered.taskmanager.env.";

// --------------------------Standalone Setup -----------------------------


// ------------------------ YARN Configuration ------------------------

/**
Expand All @@ -250,16 +287,19 @@ public final class ConfigConstants {
/**
* Percentage of heap space to remove from containers started by YARN.
*/
@Deprecated
public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio";

/**
* Minimum amount of memory to remove from the heap space as a safety margin.
*/
@Deprecated
public static final String YARN_HEAP_CUTOFF_MIN = "yarn.heap-cutoff-min";

/**
* Reallocate failed YARN containers.
*/
@Deprecated
public static final String YARN_REALLOCATE_FAILED_CONTAINERS = "yarn.reallocate-failed";

/**
Expand Down Expand Up @@ -300,14 +340,16 @@ public final class ConfigConstants {
* yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
* in the flink-conf.yaml.
*/
@Deprecated
public static final String YARN_APPLICATION_MASTER_ENV_PREFIX = "yarn.application-master.env.";

/**
* Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this configuration prefix allows
* setting custom environment variables.
*/
@Deprecated
public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env.";

/**
* The config parameter defining the Akka actor system port for the ApplicationMaster and
* JobManager
Expand Down Expand Up @@ -582,6 +624,11 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_JOB_MANAGER_IPC_PORT = 6123;

/**
* The default network port of the resource manager.
*/
public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;

/**
* Default number of retries for failed BLOB fetches.
*/
Expand Down Expand Up @@ -687,27 +734,27 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;

// ------------------------ YARN Configuration ------------------------

// ------ Common Resource Framework Configuration (YARN & Mesos) ------

/**
* Minimum amount of Heap memory to subtract from the requested TaskManager size.
* We came up with these values experimentally.
* Flink fails when the cutoff is set only to 500 mb.
* Minimum amount of memory to subtract from the process memory to get the TaskManager
* heap size. We came up with these values experimentally.
*/
public static final int DEFAULT_YARN_MIN_HEAP_CUTOFF = 600;
public static final int DEFAULT_YARN_HEAP_CUTOFF = 600;

/**
* Relative amount of memory to subtract from the requested memory.
* Relative amount of memory to subtract from Java process memory to get the TaskManager
* heap size
*/
public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;

/**
* Default port for the application master is 0, which means
* the operating system assigns an ephemeral port
*/
public static final String DEFAULT_YARN_APPLICATION_MASTER_PORT = "0";


public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0";

// ------------------------ File System Behavior ------------------------

/**
Expand Down Expand Up @@ -814,6 +861,10 @@ public final class ConfigConstants {

public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1;

public static final String LOCAL_NUMBER_RESOURCE_MANAGER = "local.number-resourcemanager";

public static final int DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER = 1;

public static final String LOCAL_START_WEBSERVER = "local.start-webserver";

// --------------------------- Recovery ---------------------------------
Expand Down
5 changes: 2 additions & 3 deletions flink-core/src/main/java/org/apache/flink/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,11 @@ public static String hostAndPortToUrlString(String host, int port) throws Unknow
* @return Set of ports from the range definition
* @throws NumberFormatException If an invalid string is passed.
*/

public static Iterator<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException {
final String[] ranges = rangeDefinition.trim().split(",");
List<Iterator<Integer>> iterators = new ArrayList<>(ranges.length);
for(String rawRange: ranges) {
Iterator<Integer> rangeIterator = null;
Iterator<Integer> rangeIterator;
String range = rawRange.trim();
int dashIdx = range.indexOf('-');
if (dashIdx == -1) {
Expand Down Expand Up @@ -229,7 +228,7 @@ public void remove() {
* @param factory A factory for creating the SocketServer
* @return null if no port was available or an allocated socket.
*/
public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator, SocketFactory factory) throws IOException {
public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator, SocketFactory factory) {
while (portsIterator.hasNext()) {
int port = portsIterator.next();
LOG.debug("Trying to open socket on port {}", port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.flink.runtime.akka;

import akka.actor.UntypedActor;

import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -129,7 +131,7 @@ private void handleDiscardedMessage(UUID expectedLeaderSessionID, LeaderSessionM
* @return The deocrated message
*/
protected Object decorateMessage(Object message) {
if(message instanceof RequiresLeaderSessionID) {
if (message instanceof RequiresLeaderSessionID) {
return new LeaderSessionMessage(getLeaderSessionID(), message);
} else {
return message;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.clusterframework;

/**
* The status of an application.
*/
public enum ApplicationStatus {

/** Application finished successfully */
SUCCEEDED(0),

/** Application encountered an unrecoverable failure or error */
FAILED(1443),

/** Application was canceled or killed on request */
CANCELED(1444),

/** Application status is not known */
UNKNOWN(1445);

// ------------------------------------------------------------------------

/** The associated process exit code */
private final int processExitCode;

private ApplicationStatus(int exitCode) {
this.processExitCode = exitCode;
}

/**
* Gets the process exit code associated with this status
* @return The associated process exit code.
*/
public int processExitCode() {
return processExitCode;
}
}
Loading

0 comments on commit 92ff2b1

Please sign in to comment.