Skip to content

Commit

Permalink
[FLINK-14188][runtime] Derive and register TaskExecutor to ResourceMa…
Browse files Browse the repository at this point in the history
…nager with default slot resource profile.
  • Loading branch information
xintongsong authored and azagrebin committed Dec 8, 2019
1 parent d2558f4 commit db33a49
Show file tree
Hide file tree
Showing 20 changed files with 216 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ public void testWorkerStarted() throws Exception {
final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
// send registration message
CompletableFuture<RegistrationResponse> successfulFuture =
resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, dataPort, hardwareDescription, timeout);
resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, dataPort, hardwareDescription, ResourceProfile.ZERO, timeout);
RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
final TaskExecutorRegistrationSuccess registrationResponse = (TaskExecutorRegistrationSuccess) response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.flink.runtime.clusterframework;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;

Expand Down Expand Up @@ -84,6 +86,29 @@ private static String assembleDynamicConfigsStr(final Map<String, String> config
return sb.toString();
}

// ------------------------------------------------------------------------
// Generating Default Slot Resource Profiles
// ------------------------------------------------------------------------

@VisibleForTesting
public static ResourceProfile generateDefaultSlotResourceProfile(Configuration configuration) {
return generateDefaultSlotResourceProfile(
resourceSpecFromConfig(configuration),
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS));
}

public static ResourceProfile generateDefaultSlotResourceProfile(
TaskExecutorResourceSpec taskExecutorResourceSpec,
int numberOfSlots) {
return ResourceProfile.newBuilder()
.setCpuCores(taskExecutorResourceSpec.getCpuCores().divide(numberOfSlots))
.setTaskHeapMemory(taskExecutorResourceSpec.getTaskHeapSize().divide(numberOfSlots))
.setTaskOffHeapMemory(taskExecutorResourceSpec.getTaskOffHeapSize().divide(numberOfSlots))
.setManagedMemory(taskExecutorResourceSpec.getManagedMemorySize().divide(numberOfSlots))
.setShuffleMemory(taskExecutorResourceSpec.getShuffleMemSize().divide(numberOfSlots))
.build();
}

// ------------------------------------------------------------------------
// Memory Configuration Calculations
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ public CompletableFuture<RegistrationResponse> registerTaskExecutor(
final ResourceID taskExecutorResourceId,
final int dataPort,
final HardwareDescription hardwareDescription,
final ResourceProfile defaultResourceProfile,
final Time timeout) {

CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
Expand Down Expand Up @@ -95,6 +96,7 @@ CompletableFuture<Acknowledge> requestSlot(
* @param resourceId The resource ID of the TaskExecutor that registers
* @param dataPort port used for data communication between TaskExecutors
* @param hardwareDescription of the registering TaskExecutor
* @param defaultResourceProfile of the registering TaskExecutor
* @param timeout The timeout for the response.
*
* @return The future to the response by the ResourceManager.
Expand All @@ -104,6 +106,7 @@ CompletableFuture<RegistrationResponse> registerTaskExecutor(
ResourceID resourceId,
int dataPort,
HardwareDescription hardwareDescription,
ResourceProfile defaultResourceProfile,
@RpcTimeout Time timeout);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,8 @@ private void connectToResourceManager() {
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener());
new ResourceManagerRegistrationListener(),
taskManagerConfiguration.getDefaultSlotResourceProfile());
resourceManagerConnection.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class TaskExecutorToResourceManagerConnection

private final RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> registrationListener;

private final ResourceProfile defaultSlotResourceProfile;

public TaskExecutorToResourceManagerConnection(
Logger log,
RpcService rpcService,
Expand All @@ -68,7 +71,8 @@ public TaskExecutorToResourceManagerConnection(
String resourceManagerAddress,
ResourceManagerId resourceManagerId,
Executor executor,
RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> registrationListener) {
RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> registrationListener,
ResourceProfile defaultSlotResourceProfile) {

super(log, resourceManagerAddress, resourceManagerId, executor);

Expand All @@ -79,6 +83,7 @@ public TaskExecutorToResourceManagerConnection(
this.dataPort = dataPort;
this.hardwareDescription = checkNotNull(hardwareDescription);
this.registrationListener = checkNotNull(registrationListener);
this.defaultSlotResourceProfile = checkNotNull(defaultSlotResourceProfile);
}

@Override
Expand All @@ -92,7 +97,8 @@ protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, TaskEx
taskManagerAddress,
taskManagerResourceId,
dataPort,
hardwareDescription);
hardwareDescription,
defaultSlotResourceProfile);
}

@Override
Expand Down Expand Up @@ -125,6 +131,8 @@ private static class ResourceManagerRegistration

private final HardwareDescription hardwareDescription;

private final ResourceProfile defaultSlotResourceProfile;

ResourceManagerRegistration(
Logger log,
RpcService rpcService,
Expand All @@ -134,13 +142,15 @@ private static class ResourceManagerRegistration
String taskExecutorAddress,
ResourceID resourceID,
int dataPort,
HardwareDescription hardwareDescription) {
HardwareDescription hardwareDescription,
ResourceProfile defaultSlotResourceProfile) {

super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, resourceManagerId, retryingRegistrationConfiguration);
this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
this.resourceID = checkNotNull(resourceID);
this.dataPort = dataPort;
this.hardwareDescription = checkNotNull(hardwareDescription);
this.defaultSlotResourceProfile = checkNotNull(defaultSlotResourceProfile);
}

@Override
Expand All @@ -153,6 +163,7 @@ protected CompletableFuture<RegistrationResponse> invokeRegistration(
resourceID,
dataPort,
hardwareDescription,
defaultSlotResourceProfile,
timeout);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
Expand All @@ -49,6 +50,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {

private final int numberSlots;

private final ResourceProfile defaultSlotResourceProfile;

private final String[] tmpDirectories;

private final Time timeout;
Expand Down Expand Up @@ -79,6 +82,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {

public TaskManagerConfiguration(
int numberSlots,
ResourceProfile defaultSlotResourceProfile,
String[] tmpDirectories,
Time timeout,
@Nullable Time maxRegistrationDuration,
Expand All @@ -94,6 +98,7 @@ public TaskManagerConfiguration(
RetryingRegistrationConfiguration retryingRegistrationConfiguration) {

this.numberSlots = numberSlots;
this.defaultSlotResourceProfile = defaultSlotResourceProfile;
this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
this.timeout = Preconditions.checkNotNull(timeout);
this.maxRegistrationDuration = maxRegistrationDuration;
Expand All @@ -113,6 +118,10 @@ public int getNumberSlots() {
return numberSlots;
}

public ResourceProfile getDefaultSlotResourceProfile() {
return defaultSlotResourceProfile;
}

public Time getTimeout() {
return timeout;
}
Expand Down Expand Up @@ -176,7 +185,7 @@ public RetryingRegistrationConfiguration getRetryingRegistrationConfiguration()
// Static factory methods
// --------------------------------------------------------------------------------------------

public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
public static TaskManagerConfiguration fromConfiguration(Configuration configuration, ResourceProfile defaultSlotResourceProfile) {
int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

if (numberSlots == -1) {
Expand Down Expand Up @@ -259,6 +268,7 @@ public static TaskManagerConfiguration fromConfiguration(Configuration configura

return new TaskManagerConfiguration(
numberSlots,
defaultSlotResourceProfile,
tmpDirPaths,
timeout,
finiteRegistrationDuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
Expand Down Expand Up @@ -353,12 +356,16 @@ public static TaskExecutor startTaskManager(

InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());

final TaskExecutorResourceSpec taskExecutorResourceSpec;
taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);

TaskManagerServicesConfiguration taskManagerServicesConfiguration =
TaskManagerServicesConfiguration.fromConfiguration(
configuration,
resourceID,
remoteAddress,
localCommunicationOnly);
localCommunicationOnly,
taskExecutorResourceSpec);

Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
Expand All @@ -371,7 +378,10 @@ public static TaskExecutor startTaskManager(
taskManagerMetricGroup.f1,
rpcService.getExecutor()); // TODO replace this later with some dedicated executor for io.

TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
ResourceProfile defaultSlotResourceProfile = TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(
taskExecutorResourceSpec,
taskManagerServicesConfiguration.getNumberOfSlots());
TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration, defaultSlotResourceProfile);

String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
Expand Down Expand Up @@ -196,7 +195,8 @@ public static TaskManagerServicesConfiguration fromConfiguration(
Configuration configuration,
ResourceID resourceID,
InetAddress remoteAddress,
boolean localCommunicationOnly) {
boolean localCommunicationOnly,
TaskExecutorResourceSpec taskExecutorResourceSpec) {
final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);
if (localStateRootDir.length == 0) {
Expand All @@ -212,7 +212,6 @@ public static TaskManagerServicesConfiguration fromConfiguration(

final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration);

final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
return new TaskManagerServicesConfiguration(
configuration,
resourceID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.util.TestLogger;

import org.junit.After;
Expand Down Expand Up @@ -63,6 +64,16 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
MemorySize.parse("7m"),
MemorySize.parse("8m"));

private static final int NUMBER_OF_SLOTS = 2;

private static final ResourceProfile DEFAULT_RESOURCE_PROFILE = ResourceProfile.newBuilder()
.setCpuCores(new CPUResource(0.5))
.setTaskHeapMemory(MemorySize.parse("3m").divide(NUMBER_OF_SLOTS))
.setTaskOffHeapMemory(MemorySize.parse("2m"))
.setShuffleMemory(MemorySize.parse("5m").divide(NUMBER_OF_SLOTS))
.setManagedMemory(MemorySize.parse("3m"))
.build();

private static Map<String, String> oldEnvVariables;

@Before
Expand Down Expand Up @@ -502,6 +513,13 @@ public void testConfigTotalProcessMemoryAddUpFailure() {
validateFail(conf);
}

@Test
public void testGenerateDefaultSlotProfile() {
assertThat(
TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(TM_RESOURCE_SPEC, NUMBER_OF_SLOTS),
is(DEFAULT_RESOURCE_PROFILE));
}

private void validateInAllConfigurations(final Configuration customConfig, Consumer<TaskExecutorResourceSpec> validateFunc) {
validateInConfigWithExplicitTaskHeapAndManagedMem(customConfig, validateFunc);
validateInConfigWithExplicitTotalFlinkMem(customConfig, validateFunc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void testDelayedRegisterTaskExecutor() throws Exception {
TestingUtils.defaultExecutor()));

CompletableFuture<RegistrationResponse> firstFuture =
rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, fastTimeout);
rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, ResourceProfile.ZERO, fastTimeout);
try {
firstFuture.get();
fail("Should have failed because connection to taskmanager is delayed beyond timeout");
Expand All @@ -249,7 +249,7 @@ public void testDelayedRegisterTaskExecutor() throws Exception {
// second registration after timeout is with no delay, expecting it to be succeeded
rpcService.resetRpcGatewayFutureFunction();
CompletableFuture<RegistrationResponse> secondFuture =
rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, ResourceProfile.ZERO, TIMEOUT);
RegistrationResponse response = secondFuture.get();
assertTrue(response instanceof TaskExecutorRegistrationSuccess);

Expand Down Expand Up @@ -339,6 +339,7 @@ private CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceMan
taskExecutorResourceID,
dataPort,
hardwareDescription,
ResourceProfile.ZERO,
TIMEOUT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway,
taskExecutorId,
dataPort,
hardwareDescription,
ResourceProfile.ZERO,
TestingUtils.TIMEOUT());

assertThat(registrationFuture.get(), instanceOf(RegistrationResponse.Success.class));
Expand Down
Loading

0 comments on commit db33a49

Please sign in to comment.