Skip to content

Commit

Permalink
[hotfix][runtime] Code deduplication in ResourceManagerFactory and it…
Browse files Browse the repository at this point in the history
…s implementations.
  • Loading branch information
xintongsong authored and tillrohrmann committed Apr 27, 2020
1 parent b9fe201 commit 63b13c5
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected DispatcherResourceManagerComponentFactory createDispatcherResourceMana
new DefaultDispatcherRunnerFactory(
ApplicationDispatcherLeaderProcessFactoryFactory
.create(configuration, SessionDispatcherFactory.INSTANCE, program)),
StandaloneResourceManagerFactory.INSTANCE,
StandaloneResourceManagerFactory.getInstance(),
JobRestEndpointFactory.INSTANCE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ConfigurationException;

import javax.annotation.Nullable;

Expand All @@ -57,7 +58,7 @@ public static KubernetesResourceManagerFactory getInstance() {
}

@Override
public ResourceManager<KubernetesWorkerNode> createActiveResourceManager(
public ResourceManager<KubernetesWorkerNode> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
Expand All @@ -66,14 +67,9 @@ public ResourceManager<KubernetesWorkerNode> createActiveResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration =
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(
configuration, KubernetesWorkerResourceSpecFactory.INSTANCE);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
rmServicesConfiguration,
highAvailabilityServices,
rpcService.getScheduledExecutor());
ResourceManagerMetricGroup resourceManagerMetricGroup,
ResourceManagerRuntimeServices resourceManagerRuntimeServices) {

final KubernetesResourceManagerConfiguration kubernetesResourceManagerConfiguration =
new KubernetesResourceManagerConfiguration(
configuration.getString(KubernetesConfigOptions.CLUSTER_ID),
Expand All @@ -85,13 +81,19 @@ public ResourceManager<KubernetesWorkerNode> createActiveResourceManager(
configuration,
highAvailabilityServices,
heartbeatServices,
rmRuntimeServices.getSlotManager(),
resourceManagerRuntimeServices.getSlotManager(),
ResourceManagerPartitionTrackerImpl::new,
rmRuntimeServices.getJobLeaderIdService(),
resourceManagerRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup,
KubeClientFactory.fromConfiguration(configuration),
kubernetesResourceManagerConfiguration);
}

@Override
protected ResourceManagerRuntimeServicesConfiguration createResourceManagerRuntimeServicesConfiguration(
Configuration configuration) throws ConfigurationException {
return ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration, KubernetesWorkerResourceSpecFactory.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ConfigurationException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -62,7 +63,7 @@ public MesosResourceManagerFactory(@Nonnull MesosServices mesosServices, @Nonnul
}

@Override
public ResourceManager<RegisteredMesosWorkerNode> createActiveResourceManager(
public ResourceManager<RegisteredMesosWorkerNode> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
Expand All @@ -71,13 +72,8 @@ public ResourceManager<RegisteredMesosWorkerNode> createActiveResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration =
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration, MesosWorkerResourceSpecFactory.INSTANCE);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
rmServicesConfiguration,
highAvailabilityServices,
rpcService.getScheduledExecutor());
ResourceManagerMetricGroup resourceManagerMetricGroup,
ResourceManagerRuntimeServices resourceManagerRuntimeServices) throws Exception {

final MesosTaskManagerParameters taskManagerParameters = MesosUtils.createTmParameters(configuration, LOG);
final ContainerSpecification taskManagerContainerSpec = MesosUtils.createContainerSpec(configuration);
Expand All @@ -87,9 +83,9 @@ public ResourceManager<RegisteredMesosWorkerNode> createActiveResourceManager(
resourceId,
highAvailabilityServices,
heartbeatServices,
rmRuntimeServices.getSlotManager(),
resourceManagerRuntimeServices.getSlotManager(),
ResourceManagerPartitionTrackerImpl::new,
rmRuntimeServices.getJobLeaderIdService(),
resourceManagerRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
configuration,
Expand All @@ -100,4 +96,10 @@ public ResourceManager<RegisteredMesosWorkerNode> createActiveResourceManager(
webInterfaceUrl,
resourceManagerMetricGroup);
}

@Override
protected ResourceManagerRuntimeServicesConfiguration createResourceManagerRuntimeServicesConfiguration(
Configuration configuration) throws ConfigurationException {
return ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration, MesosWorkerResourceSpecFactory.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public StandaloneSessionClusterEntrypoint(Configuration configuration) {

@Override
protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ protected Collection<? extends DispatcherResourceManagerComponent> createDispatc

@Nonnull
DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() {
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*
* @param <T> type of the {@link ResourceIDRetrievable}
*/
public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievable> implements ResourceManagerFactory<T> {
public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievable> extends ResourceManagerFactory<T> {

@Override
public ResourceManager<T> createResourceManager(
Expand All @@ -54,7 +54,7 @@ public ResourceManager<T> createResourceManager(
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
return createActiveResourceManager(
return super.createResourceManager(
createActiveResourceManagerConfiguration(configuration),
resourceId,
rpcService,
Expand All @@ -66,19 +66,8 @@ public ResourceManager<T> createResourceManager(
resourceManagerMetricGroup);
}

private static Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
private Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
return TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
originalConfiguration, TaskManagerOptions.TOTAL_PROCESS_MEMORY);
}

protected abstract ResourceManager<T> createActiveResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ConfigurationException;

import javax.annotation.Nullable;

Expand All @@ -35,16 +36,57 @@
*
* @param <T> type of the workers of the ResourceManager
*/
public interface ResourceManagerFactory<T extends ResourceIDRetrievable> {

ResourceManager<T> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception;
public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {

public ResourceManager<T> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {

final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices(
configuration, rpcService, highAvailabilityServices);

return createResourceManager(
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
clusterInformation,
webInterfaceUrl,
resourceManagerMetricGroup,
resourceManagerRuntimeServices);
}

protected abstract ResourceManager<T> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
ResourceManagerRuntimeServices resourceManagerRuntimeServices) throws Exception;

private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices) throws ConfigurationException {
return ResourceManagerRuntimeServices.fromConfiguration(
createResourceManagerRuntimeServicesConfiguration(configuration),
highAvailabilityServices,
rpcService.getScheduledExecutor());
}

protected abstract ResourceManagerRuntimeServicesConfiguration createResourceManagerRuntimeServicesConfiguration(
Configuration configuration) throws ConfigurationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,35 @@
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ConfigurationException;

import javax.annotation.Nullable;

/**
* {@link ResourceManagerFactory} which creates a {@link StandaloneResourceManager}.
*/
public enum StandaloneResourceManagerFactory implements ResourceManagerFactory<ResourceID> {
INSTANCE;
public final class StandaloneResourceManagerFactory extends ResourceManagerFactory<ResourceID> {

private static final StandaloneResourceManagerFactory INSTANCE = new StandaloneResourceManagerFactory();

private StandaloneResourceManagerFactory() {}

public static StandaloneResourceManagerFactory getInstance() {
return INSTANCE;
}

@Override
public ResourceManager<ResourceID> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration =
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration, ArbitraryWorkerResourceSpecFactory.INSTANCE);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
highAvailabilityServices,
rpcService.getScheduledExecutor());
protected ResourceManager<ResourceID> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
ResourceManagerRuntimeServices resourceManagerRuntimeServices) {

final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);

Expand All @@ -73,4 +76,10 @@ public ResourceManager<ResourceID> createResourceManager(
standaloneClusterStartupPeriodTime,
AkkaUtils.getTimeoutAsTime(configuration));
}

@Override
protected ResourceManagerRuntimeServicesConfiguration createResourceManagerRuntimeServicesConfiguration(
Configuration configuration) throws ConfigurationException {
return ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration, ArbitraryWorkerResourceSpecFactory.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testCancelingOnProcessFailure() throws Exception {
config.setInteger(JobManagerOptions.PORT, jobManagerPort);

final DispatcherResourceManagerComponentFactory resourceManagerComponentFactory = DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
StandaloneResourceManagerFactory.INSTANCE);
StandaloneResourceManagerFactory.getInstance());
DispatcherResourceManagerComponent dispatcherResourceManagerComponent = null;

final ScheduledExecutorService ioExecutor = TestingUtils.defaultExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.yarn.YarnResourceManager;
import org.apache.flink.yarn.YarnWorkerNode;

Expand All @@ -51,7 +52,7 @@ public static YarnResourceManagerFactory getInstance() {
}

@Override
public ResourceManager<YarnWorkerNode> createActiveResourceManager(
public ResourceManager<YarnWorkerNode> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
Expand All @@ -60,13 +61,8 @@ public ResourceManager<YarnWorkerNode> createActiveResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration =
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration, YarnWorkerResourceSpecFactory.INSTANCE);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
rmServicesConfiguration,
highAvailabilityServices,
rpcService.getScheduledExecutor());
ResourceManagerMetricGroup resourceManagerMetricGroup,
ResourceManagerRuntimeServices resourceManagerRuntimeServices) {

return new YarnResourceManager(
rpcService,
Expand All @@ -75,12 +71,18 @@ public ResourceManager<YarnWorkerNode> createActiveResourceManager(
System.getenv(),
highAvailabilityServices,
heartbeatServices,
rmRuntimeServices.getSlotManager(),
resourceManagerRuntimeServices.getSlotManager(),
ResourceManagerPartitionTrackerImpl::new,
rmRuntimeServices.getJobLeaderIdService(),
resourceManagerRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
webInterfaceUrl,
resourceManagerMetricGroup);
}

@Override
protected ResourceManagerRuntimeServicesConfiguration createResourceManagerRuntimeServicesConfiguration(
Configuration configuration) throws ConfigurationException {
return ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration, YarnWorkerResourceSpecFactory.INSTANCE);
}
}

0 comments on commit 63b13c5

Please sign in to comment.