Skip to content

Commit

Permalink
[FLINK-7076] [yarn] Implement stopWorker logic for YarnResourceManager
Browse files Browse the repository at this point in the history
This closes apache#4729.
  • Loading branch information
Shuyi Chen authored and tillrohrmann committed Nov 8, 2017
1 parent 7d29369 commit 59e3b01
Show file tree
Hide file tree
Showing 8 changed files with 517 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,13 @@ public void startNewWorker(ResourceProfile resourceProfile) {
}

@Override
public boolean stopWorker(ResourceID resourceID) {
LOG.info("Stopping worker {}.", resourceID);

public boolean stopWorker(RegisteredMesosWorkerNode workerNode) {
LOG.info("Stopping worker {}.", workerNode.getResourceID());
try {

if (workersInLaunch.containsKey(resourceID)) {
if (workersInLaunch.containsKey(workerNode.getResourceID())) {
// update persistent state of worker to Released
MesosWorkerStore.Worker worker = workersInLaunch.remove(resourceID);
MesosWorkerStore.Worker worker = workersInLaunch.remove(workerNode.getResourceID());
worker = worker.releaseWorker();
workerStore.putWorker(worker);
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
Expand All @@ -440,11 +439,11 @@ public boolean stopWorker(ResourceID resourceID) {
launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), selfActor);
}
}
else if (workersBeingReturned.containsKey(resourceID)) {
LOG.info("Ignoring request to stop worker {} because it is already being stopped.", resourceID);
else if (workersBeingReturned.containsKey(workerNode.getResourceID())) {
LOG.info("Ignoring request to stop worker {} because it is already being stopped.", workerNode.getResourceID());
}
else {
LOG.warn("Unrecognized worker {}.", resourceID);
LOG.warn("Unrecognized worker {}.", workerNode.getResourceID());
}
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ public void testStopWorker() throws Exception {
resourceManager.launchCoordinator.expectMsgClass(LaunchCoordinator.Assign.class);

// tell the RM to stop the worker
resourceManager.stopWorker(extractResourceID(task1));
resourceManager.stopWorker(new RegisteredMesosWorkerNode(worker1launched));

// verify that the instance state was updated
MesosWorkerStore.Worker worker1Released = worker1launched.releaseWorker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
Expand Down Expand Up @@ -62,7 +63,6 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -86,7 +86,7 @@
* <li>{@link #requestSlot(JobMasterId, SlotRequest, Time)} requests a slot from the resource manager</li>
* </ul>
*/
public abstract class ResourceManager<WorkerType extends Serializable>
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
extends FencedRpcEndpoint<ResourceManagerId>
implements ResourceManagerGateway, LeaderContender {

Expand Down Expand Up @@ -800,21 +800,22 @@ protected void jobLeaderLostLeadership(JobID jobId, JobMasterId oldJobMasterId)
}

protected void releaseResource(InstanceID instanceId) {
ResourceID resourceID = null;
WorkerType worker = null;

// TODO: Improve performance by having an index on the instanceId
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry : taskExecutors.entrySet()) {
if (entry.getValue().getInstanceID().equals(instanceId)) {
resourceID = entry.getKey();
worker = entry.getValue().getWorker();
break;
}
}

if (resourceID != null) {
if (stopWorker(resourceID)) {
closeTaskManagerConnection(resourceID, new FlinkException("Worker was stopped."));
if (worker != null) {
if (stopWorker(worker)) {
closeTaskManagerConnection(worker.getResourceID(),
new FlinkException("Worker was stopped."));
} else {
log.debug("Worker {} was not stopped.", resourceID);
log.debug("Worker {} was not stopped.", worker.getResourceID());
}
} else {
// unregister in order to clean up potential left over state
Expand Down Expand Up @@ -958,10 +959,10 @@ public void handleError(final Exception exception) {
/**
* Stops the given worker.
*
* @param resourceID identifying the worker to be stopped
* @param worker The worker.
* @return True if the worker was stopped, otherwise false
*/
public abstract boolean stopWorker(ResourceID resourceID);
public abstract boolean stopWorker(WorkerType worker);

// ------------------------------------------------------------------------
// Static utility classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@

package org.apache.flink.runtime.resourcemanager.registration;

import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;

/**
* This class extends the {@link TaskExecutorConnection}, adding the worker information.
*/
public class WorkerRegistration<WorkerType extends Serializable> extends TaskExecutorConnection {
public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> extends TaskExecutorConnection {

private final WorkerType worker;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,4 +959,21 @@ boolean isTaskManagerIdle(InstanceID instanceId) {
return false;
}
}

@VisibleForTesting
public void unregisterTaskManagersAndReleaseResources() {
Iterator<Map.Entry<InstanceID, TaskManagerRegistration>> taskManagerRegistrationIterator =
taskManagerRegistrations.entrySet().iterator();

while (taskManagerRegistrationIterator.hasNext()) {
TaskManagerRegistration taskManagerRegistration =
taskManagerRegistrationIterator.next().getValue();

taskManagerRegistrationIterator.remove();

internalUnregisterTaskManager(taskManagerRegistration);

resourceActions.releaseResource(taskManagerRegistration.getInstanceId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.FiniteDuration;
Expand All @@ -63,11 +65,14 @@
* The yarn implementation of the resource manager. Used when the system is started
* via the resource framework YARN.
*/
public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
public class YarnResourceManager extends ResourceManager<YarnWorkerNode> implements AMRMClientAsync.CallbackHandler {

/** The process environment variables. */
private final Map<String, String> env;

/** YARN container map. Package private for unit test purposes. */
final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;

/** The default registration timeout for task executor in seconds. */
private static final int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 300;

Expand Down Expand Up @@ -133,6 +138,7 @@ public YarnResourceManager(
this.flinkConfig = flinkConfig;
this.yarnConfig = new YarnConfiguration();
this.env = env;
this.workerNodeMap = new ConcurrentHashMap<>();
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;

Expand All @@ -149,25 +155,34 @@ public YarnResourceManager(
numPendingContainerRequests = 0;
}

@Override
protected void initialize() throws ResourceManagerException {
resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
resourceManagerClient.init(yarnConfig);
resourceManagerClient.start();
protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient() {
AMRMClientAsync<AMRMClient.ContainerRequest> rmc = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
rmc.init(yarnConfig);
rmc.start();
try {
//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
//TODO: the third paramter should be the webmonitor address
resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
rmc.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
} catch (Exception e) {
log.info("registerApplicationMaster fail", e);
}
return rmc;
}

protected NMClient createAndStartNodeManagerClient() {
// create the client to communicate with the node managers
nodeManagerClient = NMClient.createNMClient();
nodeManagerClient.init(yarnConfig);
nodeManagerClient.start();
nodeManagerClient.cleanupRunningContainersOnStop(true);
NMClient nmc = NMClient.createNMClient();
nmc.init(yarnConfig);
nmc.start();
nmc.cleanupRunningContainersOnStop(true);
return nmc;
}

@Override
protected void initialize() throws ResourceManagerException {
resourceManagerClient = createAndStartResourceManagerClient();
nodeManagerClient = createAndStartNodeManagerClient();
}

@Override
Expand Down Expand Up @@ -227,14 +242,27 @@ public void startNewWorker(ResourceProfile resourceProfile) {
}

@Override
public boolean stopWorker(ResourceID resourceID) {
// TODO: Implement to stop the worker
return false;
public boolean stopWorker(YarnWorkerNode workerNode) {
if (workerNode != null) {
Container container = workerNode.getContainer();
log.info("Stopping container {}.", container.getId().toString());
// release the container on the node manager
try {
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
} catch (Throwable t) {
log.warn("Error while calling YARN Node Manager to stop container", t);
}
resourceManagerClient.releaseAssignedContainer(container.getId());
workerNodeMap.remove(workerNode.getResourceID());
} else {
log.error("Can not find container with resource ID {}.", workerNode.getResourceID().toString());
}
return true;
}

@Override
protected ResourceID workerStarted(ResourceID resourceID) {
return resourceID;
protected YarnWorkerNode workerStarted(ResourceID resourceID) {
return workerNodeMap.get(resourceID);
}

// AMRMClientAsync CallbackHandler methods
Expand All @@ -260,10 +288,14 @@ public void onContainersAllocated(List<Container> containers) {
numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
log.info("Received new container: {} - Remaining pending container requests: {}",
container.getId(), numPendingContainerRequests);
final String containerIdStr = container.getId().toString();
workerNodeMap.put(new ResourceID(containerIdStr),
new YarnWorkerNode(container));
try {
/** Context information used to start a TaskExecutor Java process */
ContainerLaunchContext taskExecutorLaunchContext =
createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost());
createTaskExecutorLaunchContext(
container.getResource(), containerIdStr, container.getNodeId().getHost());
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
}
catch (Throwable t) {
Expand Down
49 changes: 49 additions & 0 deletions flink-yarn/src/main/java/org/apache/flink/yarn/YarnWorkerNode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.util.Preconditions;

import org.apache.hadoop.yarn.api.records.Container;

/**
* A stored YARN worker, which contains the YARN container.
*/
public class YarnWorkerNode implements ResourceIDRetrievable {

private final ResourceID resourceID;
private final Container container;

public YarnWorkerNode(Container container) {
Preconditions.checkNotNull(container);
this.resourceID = new ResourceID(container.getId().toString());
this.container = container;
}

@Override
public ResourceID getResourceID() {
return resourceID;
}

public Container getContainer() {
return container;
}
}
Loading

0 comments on commit 59e3b01

Please sign in to comment.