[ https://issues.apache.org/jira/browse/FLINK-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653532#comment-16653532 ]
ASF GitHub Bot commented on FLINK-10253: ---------------------------------------- asfgit closed pull request #6839: [FLINK-10253] Run MetricQueryService with lower priority URL: https://github.com/apache/flink/pull/6839 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html index 5c073f3ad59..420bb7f1da2 100644 --- a/docs/_includes/generated/metric_configuration.html +++ b/docs/_includes/generated/metric_configuration.html @@ -12,6 +12,11 @@ <td style="word-wrap: break-word;">"0"</td> <td>The port range used for Flink's internal metric query service. Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Flink components are running on the same machine. Per default Flink will pick a random port.</td> </tr> + <tr> + <td><h5>metrics.internal.query-service.thread-priority</h5></td> + <td style="word-wrap: break-word;">1</td> + <td>The thread priority used for Flink's internal metric query service. The thread is created by Akka's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down.</td> + </tr> <tr> <td><h5>metrics.latency.granularity</h5></td> <td style="word-wrap: break-word;">"operator"</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 0e7268ee052..0785b347335 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -156,6 +156,18 @@ "ports to avoid collisions when multiple Flink components are running on the same machine. Per default " + "Flink will pick a random port."); + /** + * The thread priority for Flink's internal metric query service. The {@code 1} means the min priority and the + * {@code 10} means the max priority. + */ + public static final ConfigOption<Integer> QUERY_SERVICE_THREAD_PRIORITY = + key("metrics.internal.query-service.thread-priority") + .defaultValue(1) + .withDescription("The thread priority used for Flink's internal metric query service. The thread is created" + + " by Akka's thread pool executor. " + + "The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). " + + "Warning, increasing this value may bring the main Flink components down."); + private MetricOptions() { } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 00b61737d20..430af98bc2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -290,7 +291,8 @@ private static Config getExecutorConfigByExecutorMode(Configuration configuratio case FORK_JOIN_EXECUTOR: return AkkaUtils.getForkJoinExecutorConfig(configuration); case FIXED_THREAD_POOL_EXECUTOR: - return AkkaUtils.getThreadPoolExecutorConfig(); + return AkkaUtils.getThreadPoolExecutorConfig( + configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY)); default: throw new IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", executorMode)); } diff --git a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala new file mode 100644 index 00000000000..d6f6d76ec51 --- /dev/null +++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadFactory.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.dispatch + +import java.util.concurrent.ThreadFactory + +/** + * Composition over the [[DispatcherPrerequisites.threadFactory]] that set priority + * for newly created threads. + * + * @param newThreadPriority priority that will be set to each newly created thread + * should be between Thread.MIN_PRIORITY and Thread.MAX_PRIORITY + */ +class PriorityThreadFactory( + prerequisites: DispatcherPrerequisites, + newThreadPriority: Int) extends ThreadFactory { + override def newThread(r: Runnable): Thread = { + val newThread = prerequisites.threadFactory.newThread(r) + newThread.setPriority(newThreadPriority) + newThread + } +} diff --git a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala new file mode 100644 index 00000000000..06ef001f1a7 --- /dev/null +++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcher.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.dispatch + +import com.typesafe.config.Config + +/** + * Akka Dispatcher that creates thread with configurable priority. + * + * Example of configuration: + * + * low-priority-threads-dispatcher { + * type = akka.dispatch.PriorityThreadsDispatcher + * executor = "thread-pool-executor" + * # should be between Thread.MIN_PRIORITY (which is 1) and Thread.MAX_PRIORITY (which is 10) + * threads-priority = 1 + * thread-pool-executor { + * core-pool-size-min = 0 + * core-pool-size-factor = 2.0 + * core-pool-size-max = 10 + * } + * } + * + * Two arguments constructor (the primary constructor) is automatically called by Akka + * when it founds: + * abcde-dispatcher { + * type = akka.dispatch.PriorityThreadsDispatcher <-- the class that Akka will instantiate + * ... + * } + * + * @param config passed automatically by Akka, should contains information about threads priority + * @param prerequisites passed automatically by Akka + */ +class PriorityThreadsDispatcher(config: Config, prerequisites: DispatcherPrerequisites) + extends DispatcherConfigurator( + config, + new PriorityThreadsDispatcherPrerequisites( + prerequisites, + config.getInt(PriorityThreadsDispatcher.threadPriorityConfigKey) + ) + ) + +object PriorityThreadsDispatcher { + /** + * Configuration key under which int value should be placed. + */ + val threadPriorityConfigKey = "thread-priority" +} diff --git a/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala new file mode 100644 index 00000000000..a62f12185f6 --- /dev/null +++ b/flink-runtime/src/main/scala/akka/dispatch/PriorityThreadsDispatcherPrerequisites.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.dispatch + +/** + * Composition over [[DefaultDispatcherPrerequisites]] that replaces thread factory with one that + * allow to configure thread priority. + * + * @param newThreadPriority priority that will be set to each newly created thread + * should be between Thread.MIN_PRIORITY and Thread.MAX_PRIORITY + */ +class PriorityThreadsDispatcherPrerequisites( + prerequisites: DispatcherPrerequisites, + newThreadPriority: Int) extends DispatcherPrerequisites { + + private val defaultDispatcherPrerequisites : DefaultDispatcherPrerequisites = + new DefaultDispatcherPrerequisites( + eventStream = prerequisites.eventStream, + scheduler = prerequisites.scheduler, + dynamicAccess = prerequisites.dynamicAccess, + settings = prerequisites.settings, + mailboxes = prerequisites.mailboxes, + defaultExecutionContext = prerequisites.defaultExecutionContext, + threadFactory = new PriorityThreadFactory(prerequisites, newThreadPriority) + ) + + override def threadFactory : java.util.concurrent.ThreadFactory = { + defaultDispatcherPrerequisites.threadFactory + } + + override def eventStream : akka.event.EventStream = { + defaultDispatcherPrerequisites.eventStream + } + + override def scheduler : akka.actor.Scheduler = { + defaultDispatcherPrerequisites.scheduler + } + + override def dynamicAccess : akka.actor.DynamicAccess = { + defaultDispatcherPrerequisites.dynamicAccess + } + + override def settings : akka.actor.ActorSystem.Settings = { + defaultDispatcherPrerequisites.settings + } + + override def mailboxes : akka.dispatch.Mailboxes = { + defaultDispatcherPrerequisites.mailboxes + } + + override def defaultExecutionContext : scala.Option[scala.concurrent.ExecutionContext] = { + defaultDispatcherPrerequisites.defaultExecutionContext + } +} + +object PriorityThreadsDispatcherPrerequisites { + def apply(prerequisites: DispatcherPrerequisites, newThreadPriority: Int): + PriorityThreadsDispatcherPrerequisites = + new PriorityThreadsDispatcherPrerequisites(prerequisites, newThreadPriority) +} + + diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index de2f35ef8fd..80c17cc7bfd 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -26,7 +26,7 @@ import akka.actor._ import akka.pattern.{ask => akkaAsk} import com.typesafe.config.{Config, ConfigFactory} import org.apache.flink.api.common.time.Time -import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, SecurityOptions} +import org.apache.flink.configuration._ import org.apache.flink.runtime.concurrent.FutureUtils import org.apache.flink.runtime.net.SSLUtils import org.apache.flink.util.NetUtils @@ -291,12 +291,20 @@ object AkkaUtils { ConfigFactory.parseString(config) } - def getThreadPoolExecutorConfig: Config = { + def getThreadPoolExecutorConfig(threadPriority: Int): Config = { + if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) { + throw new IllegalConfigurationException("The threadPriority must be between " + + Thread.MIN_PRIORITY + " and " + Thread.MAX_PRIORITY + + ", but it is " + threadPriority) + } + val configString = s""" |akka { | actor { | default-dispatcher { + | type = akka.dispatch.PriorityThreadsDispatcher | executor = "thread-pool-executor" + | thread-priority = $threadPriority | thread-pool-executor { | core-pool-size-min = 2 | core-pool-size-factor = 2.0 diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index e5c1668df0a..ae6209abdd9 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -20,14 +20,17 @@ package org.apache.flink.runtime.akka import java.net.{InetAddress, InetSocketAddress} -import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException} +import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, MetricOptions} import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution +import org.apache.flink.runtime.metrics.util.MetricUtils import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol import org.apache.flink.util.NetUtils +import org.junit.Assert.assertEquals import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import org.slf4j.LoggerFactory @RunWith(classOf[JUnitRunner]) class AkkaUtilsTest @@ -184,13 +187,44 @@ class AkkaUtilsTest } test("getAkkaConfig respects executor config") { - val akkaConfig = AkkaUtils.getAkkaConfig( + var akkaConfig = AkkaUtils.getAkkaConfig( new Configuration(), "localhost", 1234, - AkkaUtils.getThreadPoolExecutorConfig) + AkkaUtils.getThreadPoolExecutorConfig(Thread.MIN_PRIORITY)) akkaConfig.getString("akka.actor.default-dispatcher.executor") should equal("thread-pool-executor") + + akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should + equal(Thread.MIN_PRIORITY) + + akkaConfig = AkkaUtils.getAkkaConfig( + new Configuration(), + "localhost", + 1234, + AkkaUtils.getThreadPoolExecutorConfig(Thread.MAX_PRIORITY)) + + akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority") should + equal(Thread.MAX_PRIORITY) + } + + test("thread priority for metrics ActorSystem ") { + var actorSystem = MetricUtils.startMetricsActorSystem( + new Configuration, "localhost", LoggerFactory.getLogger("AkkaUtilsTest")) + //test default thread priority + val defaultThreadPriority = actorSystem.settings.config.getInt( + "akka.actor.default-dispatcher.thread-priority") + //check default value + assertEquals(Thread.MIN_PRIORITY, defaultThreadPriority) + + val config = new Configuration() + config.setInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY, Thread.MAX_PRIORITY) + actorSystem = MetricUtils.startMetricsActorSystem( + config, "localhost", LoggerFactory.getLogger("AkkaUtilsTest")) + val threadPriority = actorSystem.settings.config.getInt( + "akka.actor.default-dispatcher.thread-priority") + //check config value + assertEquals(Thread.MAX_PRIORITY, threadPriority) } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Run MetricQueryService with lower priority > ------------------------------------------ > > Key: FLINK-10253 > URL: https://issues.apache.org/jira/browse/FLINK-10253 > Project: Flink > Issue Type: Sub-task > Components: Metrics > Affects Versions: 1.5.3, 1.6.0, 1.7.0 > Reporter: Till Rohrmann > Assignee: vinoyang > Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > We should run the {{MetricQueryService}} with a lower priority than the main > Flink components. An idea would be to start the underlying threads with a > lower priority. -- This message was sent by Atlassian JIRA (v7.6.3#76005)