This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 6203bf8 Minor, unify thread capacity for scannerThreadPool
6203bf8 is described below
commit 6203bf8389bb4f8d6a95c07b3b9a32c28e8cc9df
Author: XiaoxiangYu <[email protected]>
AuthorDate: Mon Jan 18 18:16:19 2021 +0800
Minor, unify thread capacity for scannerThreadPool
---
.../src/main/java/org/apache/kylin/common/KylinConfigBase.java | 2 +-
.../apache/kylin/stream/core/query/MultiThreadsResultCollector.java | 5 +++--
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 2e16f47..c3739b0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2561,7 +2561,7 @@ public abstract class KylinConfigBase implements
Serializable {
}
public int getStreamingReceiverQueryCoreThreads() {
- int def = getStreamingReceiverQueryMaxThreads() - 1;
+ int def = Math.max(2, AVAILABLE_PROCESSORS - 1);
return
Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def +
""));
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
index b914e79..76a83c7 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
@@ -45,7 +45,8 @@ public class MultiThreadsResultCollector extends
ResultCollector {
static {
KylinConfig config = KylinConfig.getInstanceFromEnv();
MAX_RUNNING_THREAD_COUNT =
config.getStreamingReceiverQueryMaxThreads();
- scannerThreadPool = new
ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
+ int coreThreads = config.getStreamingReceiverQueryCoreThreads();
+ scannerThreadPool = new ThreadPoolExecutor(coreThreads,
MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), new
NamedThreadFactory("query-worker"));
}
@@ -204,7 +205,7 @@ public class MultiThreadsResultCollector extends
ResultCollector {
public static boolean isFullUp() {
boolean occupied = scannerThreadPool.getActiveCount() >=
MAX_RUNNING_THREAD_COUNT;
if (occupied) {
- logger.debug("ThreadPool {}", scannerThreadPool);
+ logger.debug("ThreadPool {} is full .", scannerThreadPool);
}
return occupied;
}