Skip to content

Commit

Permalink
[FLINK-10247] Introduce port range option for Flink's metrics query s…
Browse files Browse the repository at this point in the history
…ervice

In order to make the port used for Flink's internal metrics query service configurable,
this commits adds the metrics.internal.query-service.port config option. This option
specifies a port range from which the metric query service picks a free one to bind to.
Per default, a random port is picked.

This closes apache#6759.
  • Loading branch information
tillrohrmann committed Oct 10, 2018
1 parent 20b080e commit 13256d1
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 4 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/metric_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>metrics.internal.query-service.port</h5></td>
<td style="word-wrap: break-word;">"0"</td>
<td>The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.</td>
</tr>
<tr>
<td><h5>metrics.latency.granularity</h5></td>
<td style="word-wrap: break-word;">"operator"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ public class MetricOptions {
key("metrics.system-resource-probing-interval")
.defaultValue(5000L);

/**
* The default network port range for Flink's internal metric query service. The {@code "0"} means that
* Flink searches for a free port.
*/
public static final ConfigOption<String> QUERY_SERVICE_PORT =
key("metrics.internal.query-service.port")
.defaultValue("0")
.withDescription("The port range used for Flink's internal metric query service. Accepts a list of ports " +
"(“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of " +
"ports to avoid collisions when multiple Flink components are running on the same machine. Per default " +
"Flink will pick a random port.");

private MetricOptions() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,34 @@ public static ActorSystem startActorSystem(
String portRangeDefinition,
Logger logger,
@Nonnull ActorSystemExecutorMode executorMode) throws Exception {
return startActorSystem(
configuration,
AkkaUtils.getFlinkActorSystemName(),
listeningAddress,
portRangeDefinition,
logger,
executorMode);
}

/**
* Starts an ActorSystem with the given configuration listening at the address/ports.
*
* @param configuration The Flink configuration
* @param actorSystemName Name of the started {@link ActorSystem}
* @param listeningAddress The address to listen at.
* @param portRangeDefinition The port range to choose a port from.
* @param logger The logger to output log information.
* @param executorMode The executor mode of Akka actor system.
* @return The ActorSystem which has been started
* @throws Exception Thrown when actor system cannot be started in specified port range
*/
public static ActorSystem startActorSystem(
Configuration configuration,
String actorSystemName,
String listeningAddress,
String portRangeDefinition,
Logger logger,
@Nonnull ActorSystemExecutorMode executorMode) throws Exception {

// parse port range definition and create port iterator
Iterator<Integer> portsIterator;
Expand All @@ -143,7 +171,13 @@ public static ActorSystem startActorSystem(
}

try {
return startActorSystem(configuration, listeningAddress, port, logger, executorMode);
return startActorSystem(
configuration,
actorSystemName,
listeningAddress,
port,
logger,
executorMode);
}
catch (Exception e) {
// we can continue to try if this contains a netty channel exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
Expand Down Expand Up @@ -60,7 +61,7 @@
public class MetricUtils {
private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class);
private static final String METRIC_GROUP_STATUS_NAME = "Status";
private static final String METRICS = "flink-metrics";
private static final String METRICS_ACTOR_SYSTEM_NAME = "flink-metrics";

private MetricUtils() {
}
Expand Down Expand Up @@ -121,11 +122,12 @@ public static void instantiateStatusMetrics(
}

public static ActorSystem startMetricsActorSystem(Configuration configuration, String hostname, Logger logger) throws Exception {
final String portRange = configuration.getString(MetricOptions.QUERY_SERVICE_PORT);
return BootstrapTools.startActorSystem(
configuration,
METRICS,
METRICS_ACTOR_SYSTEM_NAME,
hostname,
0,
portRange,
logger,
FIXED_THREAD_POOL_EXECUTOR);
}
Expand Down

0 comments on commit 13256d1

Please sign in to comment.