This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new b1fd956e0 [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager (#5852) b1fd956e0 is described below commit b1fd956e0c966700b842ff938e690130d65815ee Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Thu Sep 22 19:23:40 2022 +0800 [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager (#5852) --- .../tubemq/manager/service/TopicBackendWorker.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java index aaed365bf..4563a50b0 100644 --- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java +++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java @@ -17,6 +17,9 @@ package org.apache.inlong.tubemq.manager.service; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import lombok.extern.slf4j.Slf4j; import org.apache.inlong.tubemq.manager.repository.TopicRepository; import org.apache.inlong.tubemq.manager.service.interfaces.NodeService; @@ -48,6 +51,7 @@ public class TopicBackendWorker implements DisposableBean, Runnable { new ConcurrentHashMap<>(); private final AtomicInteger notSatisfiedCount = new AtomicInteger(0); private final NodeService nodeService; + private final ScheduledExecutorService workerExecutor; @Autowired private TopicRepository topicRepository; @@ -66,10 +70,11 @@ public class TopicBackendWorker implements DisposableBean, Runnable { private int queueMaxRunningSize; TopicBackendWorker() { - Thread thread = new Thread(this); - // daemon thread - thread.setDaemon(true); - thread.start(); + ThreadFactoryBuilder factoryBuilder = new ThreadFactoryBuilder(); + this.workerExecutor = Executors + .newSingleThreadScheduledExecutor( + factoryBuilder.setNameFormat("tubemq-manager-topic-backend-worker").build()); + workerExecutor.schedule(this, queueThreadInterval, TimeUnit.SECONDS); nodeService = new NodeServiceImpl(this); } @@ -121,11 +126,10 @@ public class TopicBackendWorker implements DisposableBean, Runnable { @Override public void run() { log.info("TopicBackendWorker has started"); - while (runFlag.get()) { + if (runFlag.get()) { try { batchAddTopic(); checkTopicFromDB(); - TimeUnit.SECONDS.sleep(queueThreadInterval); } catch (Exception exception) { log.warn("exception caught", exception); } @@ -136,5 +140,6 @@ public class TopicBackendWorker implements DisposableBean, Runnable { public void destroy() throws Exception { runFlag.set(false); nodeService.close(); + this.workerExecutor.shutdown(); } }