Skip to content

Commit

Permalink
[FLINK-14303][metrics] Replace JobManagerMetricGroup with ResourceMan…
Browse files Browse the repository at this point in the history
…agerMetricGroup in ResourceManager

With this commit, the ResourceManager uses the ResourceManagerMetricGroup to register its
metrics.
  • Loading branch information
tillrohrmann committed Oct 2, 2019
1 parent 006a15b commit 7eda4fc
Show file tree
Hide file tree
Showing 21 changed files with 71 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
Expand Down Expand Up @@ -164,7 +164,7 @@ public MesosResourceManager(
MesosTaskManagerParameters taskManagerParameters,
ContainerSpecification taskManagerContainerSpec,
@Nullable String webUiUrl,
JobManagerMetricGroup jobManagerMetricGroup) {
ResourceManagerMetricGroup resourceManagerMetricGroup) {
super(
rpcService,
resourceManagerEndpointId,
Expand All @@ -175,7 +175,7 @@ public MesosResourceManager(
jobLeaderIdService,
clusterInformation,
fatalErrorHandler,
jobManagerMetricGroup);
resourceManagerMetricGroup);

this.mesosServices = Preconditions.checkNotNull(mesosServices);
this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
Expand Down Expand Up @@ -72,7 +72,7 @@ public ResourceManager<RegisteredMesosWorkerNode> createActiveResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
rmServicesConfiguration,
Expand All @@ -95,6 +95,6 @@ public ResourceManager<RegisteredMesosWorkerNode> createActiveResourceManager(
taskManagerParameters,
taskManagerContainerSpec,
webInterfaceUrl,
jobManagerMetricGroup);
resourceManagerMetricGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
Expand Down Expand Up @@ -170,7 +170,7 @@ public TestingMesosResourceManager(
MesosConfiguration mesosConfig,
MesosTaskManagerParameters taskManagerParameters,
ContainerSpecification taskManagerContainerSpec,
JobManagerMetricGroup jobManagerMetricGroup) {
ResourceManagerMetricGroup resourceManagerMetricGroup) {
super(
rpcService,
resourceManagerEndpointId,
Expand All @@ -187,7 +187,7 @@ public TestingMesosResourceManager(
taskManagerParameters,
taskManagerContainerSpec,
null,
jobManagerMetricGroup);
resourceManagerMetricGroup);
}

@Override
Expand Down Expand Up @@ -296,7 +296,7 @@ static class Context implements AutoCloseable {
rmServices.mesosConfig,
tmParams,
containerSpecification,
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());

// TaskExecutors
task1Executor = mockTaskExecutor(task1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
Expand Down Expand Up @@ -113,6 +114,7 @@ public DispatcherResourceManagerComponent<T> create(
WebMonitorEndpoint<U> webMonitorEndpoint = null;
ResourceManager<?> resourceManager = null;
JobManagerMetricGroup jobManagerMetricGroup = null;
ResourceManagerMetricGroup resourceManagerMetricGroup = null;
T dispatcher = null;

try {
Expand Down Expand Up @@ -163,10 +165,7 @@ public DispatcherResourceManagerComponent<T> create(

final String hostname = RpcUtils.getHostname(rpcService);

jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry,
hostname);

resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, hostname);
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
Expand All @@ -176,10 +175,14 @@ public DispatcherResourceManagerComponent<T> create(
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
jobManagerMetricGroup);
resourceManagerMetricGroup);

final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);

jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry,
hostname);

final PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices(
configuration,
highAvailabilityServices,
Expand Down Expand Up @@ -256,6 +259,10 @@ public DispatcherResourceManagerComponent<T> create(
jobManagerMetricGroup.close();
}

if (resourceManagerMetricGroup != null) {
resourceManagerMetricGroup.close();
}

throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
Expand Down Expand Up @@ -55,7 +55,7 @@ public ResourceManager<T> createResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
return createActiveResourceManager(
createActiveResourceManagerConfiguration(configuration),
resourceId,
Expand All @@ -65,7 +65,7 @@ public ResourceManager<T> createResourceManager(
fatalErrorHandler,
clusterInformation,
webInterfaceUrl,
jobManagerMetricGroup);
resourceManagerMetricGroup);
}

public static Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
Expand All @@ -89,5 +89,5 @@ protected abstract ResourceManager<T> createActiveResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
JobManagerMetricGroup jobManagerMetricGroup) throws Exception;
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
Expand Down Expand Up @@ -132,7 +132,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>

private final ClusterInformation clusterInformation;

private final JobManagerMetricGroup jobManagerMetricGroup;
private final ResourceManagerMetricGroup resourceManagerMetricGroup;

/** The service to elect a ResourceManager leader. */
private LeaderElectionService leaderElectionService;
Expand Down Expand Up @@ -161,7 +161,7 @@ public ResourceManager(
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
JobManagerMetricGroup jobManagerMetricGroup) {
ResourceManagerMetricGroup resourceManagerMetricGroup) {

super(rpcService, resourceManagerEndpointId, null);

Expand All @@ -172,7 +172,7 @@ public ResourceManager(
this.jobLeaderIdService = checkNotNull(jobLeaderIdService);
this.clusterInformation = checkNotNull(clusterInformation);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
this.resourceManagerMetricGroup = checkNotNull(resourceManagerMetricGroup);

this.jobManagerRegistrations = new HashMap<>(4);
this.jmResourceIdRegistrations = new HashMap<>(4);
Expand Down Expand Up @@ -257,6 +257,8 @@ private void stopResourceManagerServices() throws Exception {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

resourceManagerMetricGroup.close();

clearStateInternal();

ExceptionUtils.tryRethrowException(exception);
Expand Down Expand Up @@ -728,13 +730,13 @@ public void requestHeartbeat(ResourceID resourceID, Void payload) {
}

private void registerSlotAndTaskExecutorMetrics() {
jobManagerMetricGroup.gauge(
resourceManagerMetricGroup.gauge(
MetricNames.TASK_SLOTS_AVAILABLE,
() -> (long) slotManager.getNumberFreeSlots());
jobManagerMetricGroup.gauge(
resourceManagerMetricGroup.gauge(
MetricNames.TASK_SLOTS_TOTAL,
() -> (long) slotManager.getNumberRegisteredSlots());
jobManagerMetricGroup.gauge(
resourceManagerMetricGroup.gauge(
MetricNames.NUM_REGISTERED_TASK_MANAGERS,
() -> (long) taskExecutors.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;

Expand All @@ -48,7 +48,7 @@ ResourceManager<T> createResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
JobManagerMetricGroup jobManagerMetricGroup) throws Exception;
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception;

default String generateEndpointIdWithUUID() {
return getEndpointId() + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
Expand Down Expand Up @@ -59,7 +59,7 @@ public StandaloneResourceManager(
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
JobManagerMetricGroup jobManagerMetricGroup,
ResourceManagerMetricGroup resourceManagerMetricGroup,
Time startupPeriodTime) {
super(
rpcService,
Expand All @@ -71,7 +71,7 @@ public StandaloneResourceManager(
jobLeaderIdService,
clusterInformation,
fatalErrorHandler,
jobManagerMetricGroup);
resourceManagerMetricGroup);
this.startupPeriodTime = Preconditions.checkNotNull(startupPeriodTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;

Expand All @@ -47,7 +47,7 @@ public ResourceManager<ResourceID> createResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
Expand All @@ -66,7 +66,7 @@ public ResourceManager<ResourceID> createResourceManager(
resourceManagerRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
jobManagerMetricGroup,
resourceManagerMetricGroup,
standaloneClusterStartupPeriodTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
Expand Down Expand Up @@ -53,7 +53,7 @@ public ResourceManager<ResourceID> createResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
JobManagerMetricGroup jobManagerMetricGroup) throws Exception {
ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
Expand All @@ -72,7 +72,7 @@ public ResourceManager<ResourceID> createResourceManager(
resourceManagerRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
jobManagerMetricGroup,
resourceManagerMetricGroup,
standaloneClusterStartupPeriodTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -70,7 +70,7 @@ public void createResourceManager_WithDefaultConfiguration_ShouldSetManagedMemor
new TestingFatalErrorHandler(),
new ClusterInformation("foobar", 1234),
null,
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
} finally {
RpcUtils.terminateRpcService(rpcService, Time.seconds(10L));
}
Expand All @@ -88,7 +88,7 @@ protected ResourceManager<ResourceID> createActiveResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
JobManagerMetricGroup jobManagerMetricGroup) {
ResourceManagerMetricGroup resourceManagerMetricGroup) {
assertThat(configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE), is(true));

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -90,7 +89,7 @@ public void confirmLeaderSessionID(UUID leaderId) {
resourceManagerRuntimeServices.getJobLeaderIdService(),
new ClusterInformation("localhost", 1234),
testingFatalErrorHandler,
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
Time.minutes(5L)) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
Expand Down Expand Up @@ -151,7 +150,7 @@ private ResourceManager<?> createAndStartResourceManager() throws Exception {
jobLeaderIdService,
new ClusterInformation("localhost", 1234),
testingFatalErrorHandler,
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
Time.minutes(5L));

resourceManager.start();
Expand Down
Loading

0 comments on commit 7eda4fc

Please sign in to comment.