diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 0c0b0dd2ffbc5..5c073f3ad59d8 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -7,6 +7,11 @@ + +
metrics.internal.query-service.port
+ "0" + The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port. +
metrics.latency.granularity
"operator" diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 67444a5397c8b..0e7268ee052cf 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -144,6 +144,18 @@ public class MetricOptions { key("metrics.system-resource-probing-interval") .defaultValue(5000L); + /** + * The default network port range for Flink's internal metric query service. The {@code "0"} means that + * Flink searches for a free port. + */ + public static final ConfigOption QUERY_SERVICE_PORT = + key("metrics.internal.query-service.port") + .defaultValue("0") + .withDescription("The port range used for Flink's internal metric query service. Accepts a list of ports " + + "(“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of " + + "ports to avoid collisions when multiple Flink components are running on the same machine. Per default " + + "Flink will pick a random port."); + private MetricOptions() { } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 462c687447748..00b61737d205b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -118,6 +118,34 @@ public static ActorSystem startActorSystem( String portRangeDefinition, Logger logger, @Nonnull ActorSystemExecutorMode executorMode) throws Exception { + return startActorSystem( + configuration, + AkkaUtils.getFlinkActorSystemName(), + listeningAddress, + portRangeDefinition, + logger, + executorMode); + } + + /** + * Starts an ActorSystem with the given configuration listening at the address/ports. + * + * @param configuration The Flink configuration + * @param actorSystemName Name of the started {@link ActorSystem} + * @param listeningAddress The address to listen at. + * @param portRangeDefinition The port range to choose a port from. + * @param logger The logger to output log information. + * @param executorMode The executor mode of Akka actor system. + * @return The ActorSystem which has been started + * @throws Exception Thrown when actor system cannot be started in specified port range + */ + public static ActorSystem startActorSystem( + Configuration configuration, + String actorSystemName, + String listeningAddress, + String portRangeDefinition, + Logger logger, + @Nonnull ActorSystemExecutorMode executorMode) throws Exception { // parse port range definition and create port iterator Iterator portsIterator; @@ -143,7 +171,13 @@ public static ActorSystem startActorSystem( } try { - return startActorSystem(configuration, listeningAddress, port, logger, executorMode); + return startActorSystem( + configuration, + actorSystemName, + listeningAddress, + port, + logger, + executorMode); } catch (Exception e) { // we can continue to try if this contains a netty channel exception diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index d7e872642c367..ba04af9f4883d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.clusterframework.BootstrapTools; @@ -60,7 +61,7 @@ public class MetricUtils { private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class); private static final String METRIC_GROUP_STATUS_NAME = "Status"; - private static final String METRICS = "flink-metrics"; + private static final String METRICS_ACTOR_SYSTEM_NAME = "flink-metrics"; private MetricUtils() { } @@ -121,11 +122,12 @@ public static void instantiateStatusMetrics( } public static ActorSystem startMetricsActorSystem(Configuration configuration, String hostname, Logger logger) throws Exception { + final String portRange = configuration.getString(MetricOptions.QUERY_SERVICE_PORT); return BootstrapTools.startActorSystem( configuration, - METRICS, + METRICS_ACTOR_SYSTEM_NAME, hostname, - 0, + portRange, logger, FIXED_THREAD_POOL_EXECUTOR); }