Skip to content

Commit

Permalink
[FLINK-10253] Run MetricQueryService with lower priority
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua authored and tillrohrmann committed Oct 17, 2018
1 parent 2c53a17 commit 3932433
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 6 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/metric_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
<td style="word-wrap: break-word;">"0"</td>
<td>The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.</td>
</tr>
<tr>
<td><h5>metrics.internal.query-service.thread-priority</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>The thread priority used for Flink's internal metric query service. The thread is created by Akka's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down.</td>
</tr>
<tr>
<td><h5>metrics.latency.granularity</h5></td>
<td style="word-wrap: break-word;">"operator"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,18 @@ public class MetricOptions {
"ports to avoid collisions when multiple Flink components are running on the same machine. Per default " +
"Flink will pick a random port.");

/**
* The thread priority for Flink's internal metric query service. The {@code 1} means the min priority and the
* {@code 10} means the max priority.
*/
public static final ConfigOption<Integer> QUERY_SERVICE_THREAD_PRIORITY =
key("metrics.internal.query-service.thread-priority")
.defaultValue(1)
.withDescription("The thread priority used for Flink's internal metric query service. The thread is created" +
" by Akka's thread pool executor. " +
"The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). " +
"Warning, increasing this value may bring the main Flink components down.");

private MetricOptions() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
Expand Down Expand Up @@ -290,7 +291,8 @@ private static Config getExecutorConfigByExecutorMode(Configuration configuratio
case FORK_JOIN_EXECUTOR:
return AkkaUtils.getForkJoinExecutorConfig(configuration);
case FIXED_THREAD_POOL_EXECUTOR:
return AkkaUtils.getThreadPoolExecutorConfig();
return AkkaUtils.getThreadPoolExecutorConfig(
configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY));
default:
throw new IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", executorMode));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package akka.dispatch

import java.util.concurrent.ThreadFactory

/**
* Composition over the [[DispatcherPrerequisites.threadFactory]] that set priority
* for newly created threads.
*
* @param newThreadPriority priority that will be set to each newly created thread
* should be between Thread.MIN_PRIORITY and Thread.MAX_PRIORITY
*/
class PriorityThreadFactory(
prerequisites: DispatcherPrerequisites,
newThreadPriority: Int) extends ThreadFactory {
override def newThread(r: Runnable): Thread = {
val newThread = prerequisites.threadFactory.newThread(r)
newThread.setPriority(newThreadPriority)
newThread
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package akka.dispatch

import com.typesafe.config.Config

/**
* Akka Dispatcher that creates thread with configurable priority.
*
* Example of configuration:
*
* low-priority-threads-dispatcher {
* type = akka.dispatch.PriorityThreadsDispatcher
* executor = "thread-pool-executor"
* # should be between Thread.MIN_PRIORITY (which is 1) and Thread.MAX_PRIORITY (which is 10)
* threads-priority = 1
* thread-pool-executor {
* core-pool-size-min = 0
* core-pool-size-factor = 2.0
* core-pool-size-max = 10
* }
* }
*
* Two arguments constructor (the primary constructor) is automatically called by Akka
* when it founds:
* abcde-dispatcher {
* type = akka.dispatch.PriorityThreadsDispatcher <-- the class that Akka will instantiate
* ...
* }
*
* @param config passed automatically by Akka, should contains information about threads priority
* @param prerequisites passed automatically by Akka
*/
class PriorityThreadsDispatcher(config: Config, prerequisites: DispatcherPrerequisites)
extends DispatcherConfigurator(
config,
new PriorityThreadsDispatcherPrerequisites(
prerequisites,
config.getInt(PriorityThreadsDispatcher.threadPriorityConfigKey)
)
)

object PriorityThreadsDispatcher {
/**
* Configuration key under which int value should be placed.
*/
val threadPriorityConfigKey = "thread-priority"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package akka.dispatch

/**
* Composition over [[DefaultDispatcherPrerequisites]] that replaces thread factory with one that
* allow to configure thread priority.
*
* @param newThreadPriority priority that will be set to each newly created thread
* should be between Thread.MIN_PRIORITY and Thread.MAX_PRIORITY
*/
class PriorityThreadsDispatcherPrerequisites(
prerequisites: DispatcherPrerequisites,
newThreadPriority: Int) extends DispatcherPrerequisites {

private val defaultDispatcherPrerequisites : DefaultDispatcherPrerequisites =
new DefaultDispatcherPrerequisites(
eventStream = prerequisites.eventStream,
scheduler = prerequisites.scheduler,
dynamicAccess = prerequisites.dynamicAccess,
settings = prerequisites.settings,
mailboxes = prerequisites.mailboxes,
defaultExecutionContext = prerequisites.defaultExecutionContext,
threadFactory = new PriorityThreadFactory(prerequisites, newThreadPriority)
)

override def threadFactory : java.util.concurrent.ThreadFactory = {
defaultDispatcherPrerequisites.threadFactory
}

override def eventStream : akka.event.EventStream = {
defaultDispatcherPrerequisites.eventStream
}

override def scheduler : akka.actor.Scheduler = {
defaultDispatcherPrerequisites.scheduler
}

override def dynamicAccess : akka.actor.DynamicAccess = {
defaultDispatcherPrerequisites.dynamicAccess
}

override def settings : akka.actor.ActorSystem.Settings = {
defaultDispatcherPrerequisites.settings
}

override def mailboxes : akka.dispatch.Mailboxes = {
defaultDispatcherPrerequisites.mailboxes
}

override def defaultExecutionContext : scala.Option[scala.concurrent.ExecutionContext] = {
defaultDispatcherPrerequisites.defaultExecutionContext
}
}

object PriorityThreadsDispatcherPrerequisites {
def apply(prerequisites: DispatcherPrerequisites, newThreadPriority: Int):
PriorityThreadsDispatcherPrerequisites =
new PriorityThreadsDispatcherPrerequisites(prerequisites, newThreadPriority)
}


Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import akka.actor._
import akka.pattern.{ask => akkaAsk}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, SecurityOptions}
import org.apache.flink.configuration._
import org.apache.flink.runtime.concurrent.FutureUtils
import org.apache.flink.runtime.net.SSLUtils
import org.apache.flink.util.NetUtils
Expand Down Expand Up @@ -291,12 +291,21 @@ object AkkaUtils {
ConfigFactory.parseString(config)
}

def getThreadPoolExecutorConfig: Config = {
def getThreadPoolExecutorConfig(threadPriority: Int): Config = {
if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) {
throw new IllegalConfigurationException("The config : " +
MetricOptions.QUERY_SERVICE_THREAD_PRIORITY.key() + "'s value must between "
+ Thread.MIN_PRIORITY + " and " + Thread.MAX_PRIORITY +
", but the value is " + threadPriority)
}

val configString = s"""
|akka {
| actor {
| default-dispatcher {
| type = akka.dispatch.PriorityThreadsDispatcher
| executor = "thread-pool-executor"
| thread-priority = $threadPriority
| thread-pool-executor {
| core-pool-size-min = 2
| core-pool-size-factor = 2.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ package org.apache.flink.runtime.akka

import java.net.{InetAddress, InetSocketAddress}

import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException}
import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, MetricOptions}
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol
import org.apache.flink.util.NetUtils
import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.slf4j.LoggerFactory

@RunWith(classOf[JUnitRunner])
class AkkaUtilsTest
Expand Down Expand Up @@ -184,13 +187,44 @@ class AkkaUtilsTest
}

test("getAkkaConfig respects executor config") {
val akkaConfig = AkkaUtils.getAkkaConfig(
var akkaConfig = AkkaUtils.getAkkaConfig(
new Configuration(),
"localhost",
1234,
AkkaUtils.getThreadPoolExecutorConfig)
AkkaUtils.getThreadPoolExecutorConfig(Thread.MIN_PRIORITY))

akkaConfig.getString("akka.actor.default-dispatcher.executor") should
equal("thread-pool-executor")

akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should
equal(Thread.MIN_PRIORITY)

akkaConfig = AkkaUtils.getAkkaConfig(
new Configuration(),
"localhost",
1234,
AkkaUtils.getThreadPoolExecutorConfig(Thread.MAX_PRIORITY))

akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should
equal(Thread.MAX_PRIORITY)
}

test("thread priority for metrics ActorSystem ") {
var actorSystem = MetricUtils.startMetricsActorSystem(
new Configuration, "localhost", LoggerFactory.getLogger("AkkaUtilsTest"))
//test default thread priority
val defaultThreadPriority = actorSystem.settings.config.getInt(
"akka.actor.default-dispatcher.thread-priority")
//check default value
assertEquals(Thread.MIN_PRIORITY, defaultThreadPriority)

val config = new Configuration()
config.setInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY, Thread.MAX_PRIORITY)
actorSystem = MetricUtils.startMetricsActorSystem(
config, "localhost", LoggerFactory.getLogger("AkkaUtilsTest"))
val threadPriority = actorSystem.settings.config.getInt(
"akka.actor.default-dispatcher.thread-priority")
//check config value
assertEquals(Thread.MAX_PRIORITY, threadPriority)
}
}

0 comments on commit 3932433

Please sign in to comment.