Skip to content

Commit

Permalink
[FLINK-12213][network] Move network metrics setup into NetworkEnviron…
Browse files Browse the repository at this point in the history
…ment

At the moment NetworkEnvironment#getNetworkBufferPool is called to add network related MetricGroup. In order to simplify the public API in NetworkEnvironment which is regarded as default ShuffleService implementation, we could pass the TaskManagerMetricGroup into constructor of NetworkEnvironment, then the related network MetricGroup could be added internally.
  • Loading branch information
zhijiangW authored and zentol committed Apr 25, 2019
1 parent 5b2ca7f commit daa51b5
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.runtime.io.network;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
Expand Down Expand Up @@ -48,6 +50,10 @@ public class NetworkEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);

private static final String METRIC_GROUP_NETWORK = "Network";
private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments";
private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments";

private final Object lock = new Object();

private final NetworkEnvironmentConfiguration config;
Expand All @@ -62,7 +68,10 @@ public class NetworkEnvironment {

private boolean isShutdown;

public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) {
public NetworkEnvironment(
NetworkEnvironmentConfiguration config,
TaskEventPublisher taskEventPublisher,
MetricGroup metricGroup) {
this.config = checkNotNull(config);

this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
Expand All @@ -78,9 +87,21 @@ public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPubli

this.taskEventPublisher = checkNotNull(taskEventPublisher);

registerNetworkMetrics(metricGroup, networkBufferPool);

isShutdown = false;
}

private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
checkNotNull(metricGroup);

MetricGroup networkGroup = metricGroup.addGroup(METRIC_GROUP_NETWORK);
networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
networkBufferPool::getTotalNumberOfMemorySegments);
networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
networkBufferPool::getNumberOfAvailableMemorySegments);
}

// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
Expand All @@ -93,6 +114,7 @@ public ConnectionManager getConnectionManager() {
return connectionManager;
}

@VisibleForTesting
public NetworkBufferPool getNetworkBufferPool() {
return networkBufferPool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -93,23 +91,19 @@ public static JobManagerMetricGroup instantiateJobManagerMetricGroup(

public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
MetricRegistry metricRegistry,
TaskManagerLocation taskManagerLocation,
NetworkEnvironment network,
String hostName,
ResourceID resourceID,
Optional<Time> systemResourceProbeInterval) {
final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
metricRegistry,
taskManagerLocation.getHostname(),
taskManagerLocation.getResourceID().toString());
hostName,
resourceID.toString());

MetricGroup statusGroup = taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);

// Initialize the TM metrics
instantiateStatusMetrics(statusGroup);

MetricGroup networkGroup = statusGroup
.addGroup("Network");
instantiateNetworkMetrics(networkGroup, network);

if (systemResourceProbeInterval.isPresent()) {
instantiateSystemMetrics(taskManagerMetricGroup, systemResourceProbeInterval.get());
}
Expand Down Expand Up @@ -139,15 +133,6 @@ public static RpcService startMetricsRpcService(Configuration configuration, Str
new BootstrapTools.FixedThreadPoolExecutorConfiguration(1, 1, threadPriority));
}

private static void instantiateNetworkMetrics(
MetricGroup metrics,
final NetworkEnvironment network) {

final NetworkBufferPool networkBufferPool = network.getNetworkBufferPool();
metrics.<Integer, Gauge<Integer>>gauge("TotalMemorySegments", networkBufferPool::getTotalNumberOfMemorySegments);
metrics.<Integer, Gauge<Integer>>gauge("AvailableMemorySegments", networkBufferPool::getNumberOfAvailableMemorySegments);
}

private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();
metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", mxBean::getTotalLoadedClassCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.MemoryLogger;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
Expand Down Expand Up @@ -357,19 +358,20 @@ public static TaskExecutor startTaskManager(
remoteAddress,
localCommunicationOnly);

TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
TaskManagerLocation.getHostName(remoteAddress),
resourceID,
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());

TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
taskManagerMetricGroup,
resourceID,
rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io.
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
EnvironmentInformation.getMaxJvmHeapMemory());

TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
taskManagerServices.getTaskManagerLocation(),
taskManagerServices.getNetworkEnvironment(),
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());

TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);

String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
Expand Down Expand Up @@ -220,16 +221,18 @@ public void shutDown() throws FlinkException {
/**
* Creates and returns the task manager services.
*
* @param resourceID resource ID of the task manager
* @param taskManagerServicesConfiguration task manager configuration
* @param taskIOExecutor executor for async IO operations.
* @param taskManagerMetricGroup metric group of the task manager
* @param resourceID resource ID of the task manager
* @param taskIOExecutor executor for async IO operations
* @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
* @param maxJvmHeapMemory the maximum JVM heap size
* @return task manager components
* @throws Exception
*/
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
TaskManagerMetricGroup taskManagerMetricGroup,
ResourceID resourceID,
Executor taskIOExecutor,
long freeHeapMemoryWithDefrag,
Expand All @@ -241,7 +244,7 @@ public static TaskManagerServices fromConfiguration(
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

final NetworkEnvironment network = new NetworkEnvironment(
taskManagerServicesConfiguration.getNetworkConfig(), taskEventDispatcher);
taskManagerServicesConfiguration.getNetworkConfig(), taskEventDispatcher, taskManagerMetricGroup);
network.start();

final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private static String getFqdnHostName(InetAddress inetAddress) {
* @param inetAddress the network address that the TaskManager binds its sockets to
* @return hostname of the TaskManager
*/
private static String getHostName(InetAddress inetAddress) {
public static String getHostName(InetAddress inetAddress) {
String hostName;
String fqdnHostName = getFqdnHostName(inetAddress);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.flink.runtime.io.network;

import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;

/**
Expand All @@ -44,6 +46,8 @@ public class NetworkEnvironmentBuilder {

private TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();

public NetworkEnvironmentBuilder setNumNetworkBuffers(int numNetworkBuffers) {
this.numNetworkBuffers = numNetworkBuffers;
return this;
Expand Down Expand Up @@ -89,6 +93,11 @@ public NetworkEnvironmentBuilder setTaskEventDispatcher(TaskEventDispatcher task
return this;
}

public NetworkEnvironmentBuilder setMetricGroup(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
return this;
}

public NetworkEnvironment build() {
return new NetworkEnvironment(
new NetworkEnvironmentConfiguration(
Expand All @@ -100,6 +109,7 @@ public NetworkEnvironment build() {
floatingNetworkBuffersPerGate,
isCreditBased,
nettyConfig),
taskEventDispatcher);
taskEventDispatcher,
metricGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.util.FileUtils;
Expand Down Expand Up @@ -211,6 +212,7 @@ private TaskManagerServices createTaskManagerServices(
TaskManagerServicesConfiguration config) throws Exception {
return TaskManagerServices.fromConfiguration(
config,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
ResourceID.generate(),
Executors.directExecutor(),
MEM_SIZE_PARAM,
Expand Down

0 comments on commit daa51b5

Please sign in to comment.