This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch refactor
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git


The following commit(s) were added to refs/heads/refactor by this push:
     new de152dd  pref: optimize the response speed of the query api (#273)
de152dd is described below

commit de152dd6f3ae81136154a0fb1547cb83ce8be4f9
Author: Xu Yichi <110229037+crazylyc...@users.noreply.github.com>
AuthorDate: Thu Mar 27 12:22:50 2025 +0800

    pref: optimize the response speed of the query api (#273)
---
 .../dashboard/service/impl/MessageServiceImpl.java |  19 ++-
 .../dashboard/service/impl/TopicServiceImpl.java   |  40 +++++--
 .../support/AutoCloseConsumerWrapper.java          | 132 +++++++++++++++++++++
 .../util/AutoCloseConsumerWrapperTests.java        |  84 +++++++++++++
 4 files changed, 256 insertions(+), 19 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java
index 16d0d4e..0586447 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper;
 import org.apache.rocketmq.remoting.protocol.body.Connection;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
@@ -127,11 +128,11 @@ public class MessageServiceImpl implements MessageService 
{
         if (isEnableAcl) {
             rpcHook = new AclClientRPCHook(new 
SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
         }
-        DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, 
configure.isUseTLS());
+        AutoCloseConsumerWrapper consumerWrapper = new 
AutoCloseConsumerWrapper();
+        DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, 
configure.isUseTLS());
         List<MessageView> messageViewList = Lists.newArrayList();
         try {
             String subExpression = "*";
-            consumer.start();
             Set<MessageQueue> mqs = 
consumer.fetchSubscribeMessageQueues(topic);
             for (MessageQueue mq : mqs) {
                 long minOffset = consumer.searchOffset(mq, begin);
@@ -188,8 +189,6 @@ public class MessageServiceImpl implements MessageService {
         } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
-        } finally {
-            consumer.shutdown();
         }
     }
 
@@ -263,7 +262,8 @@ public class MessageServiceImpl implements MessageService {
         if (isEnableAcl) {
             rpcHook = new AclClientRPCHook(new 
SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
         }
-        DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, 
configure.isUseTLS());
+        AutoCloseConsumerWrapper consumerWrapper = new 
AutoCloseConsumerWrapper();
+        DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, 
configure.isUseTLS());
 
         long total = 0;
         List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
@@ -271,7 +271,6 @@ public class MessageServiceImpl implements MessageService {
         List<MessageView> messageViews = new ArrayList<>();
 
         try {
-            consumer.start();
             Collection<MessageQueue> messageQueues = 
consumer.fetchSubscribeMessageQueues(query.getTopic());
             int idx = 0;
             for (MessageQueue messageQueue : messageQueues) {
@@ -394,8 +393,6 @@ public class MessageServiceImpl implements MessageService {
         } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
-        } finally {
-            consumer.shutdown();
         }
     }
 
@@ -405,14 +402,14 @@ public class MessageServiceImpl implements MessageService 
{
         if (isEnableAcl) {
             rpcHook = new AclClientRPCHook(new 
SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
         }
-        DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, 
configure.isUseTLS());
+        AutoCloseConsumerWrapper consumerWrapper = new 
AutoCloseConsumerWrapper();
+        DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, 
configure.isUseTLS());
         List<MessageView> messageViews = new ArrayList<>();
 
         long offset = query.getPageNum() * query.getPageSize();
 
         long total = 0;
         try {
-            consumer.start();
             for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
                 long start = queueOffsetInfo.getStart();
                 long end = queueOffsetInfo.getEnd();
@@ -462,8 +459,6 @@ public class MessageServiceImpl implements MessageService {
         } catch (Exception e) {
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
-        } finally {
-            consumer.shutdown();
         }
     }
 
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
index 4f34fc6..3f4bf9a 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
@@ -73,6 +73,10 @@ import static 
org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTR
 @Service
 public class TopicServiceImpl extends AbstractCommonService implements 
TopicService {
 
+    private transient DefaultMQProducer systemTopicProducer;
+
+    private final Object producerLock = new Object();
+
     @Autowired
     private RMQConfigure configure;
 
@@ -297,18 +301,40 @@ public class TopicServiceImpl extends 
AbstractCommonService implements TopicServ
         if (isEnableAcl) {
             rpcHook = new AclClientRPCHook(new 
SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
         }
-        DefaultMQProducer producer = 
buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
-        producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
-        producer.setNamesrvAddr(configure.getNamesrvAddr());
+
+        // ensures thread safety
+        if (systemTopicProducer == null) {
+            synchronized (producerLock) {
+                if (systemTopicProducer == null) {
+                    systemTopicProducer = 
buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
+                    systemTopicProducer.setInstanceName("SystemTopicProducer-" 
+ System.currentTimeMillis());
+                    
systemTopicProducer.setNamesrvAddr(configure.getNamesrvAddr());
+                    try {
+                        systemTopicProducer.start();
+                    } catch (Exception e) {
+                        systemTopicProducer = null;
+                        Throwables.throwIfUnchecked(e);
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        }
 
         try {
-            producer.start();
-            return 
producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L);
+            return systemTopicProducer.getDefaultMQProducerImpl()
+                    .getmQClientFactory()
+                    .getMQClientAPIImpl()
+                    .getSystemTopicList(20000L);
         } catch (Exception e) {
+            // If the call fails, close and clean up the producer, and it will 
be re-created next time.
+            synchronized (producerLock) {
+                if (systemTopicProducer != null) {
+                    systemTopicProducer.shutdown();
+                    systemTopicProducer = null;
+                }
+            }
             Throwables.throwIfUnchecked(e);
             throw new RuntimeException(e);
-        } finally {
-            producer.shutdown();
         }
     }
 
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/support/AutoCloseConsumerWrapper.java
 
b/src/main/java/org/apache/rocketmq/dashboard/support/AutoCloseConsumerWrapper.java
new file mode 100644
index 0000000..6f2b52e
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/support/AutoCloseConsumerWrapper.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.dashboard.support;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class AutoCloseConsumerWrapper {
+
+    private final Logger logger = 
LoggerFactory.getLogger(GlobalRestfulResponseBodyAdvice.class);
+
+    private static final AtomicReference<DefaultMQPullConsumer> CONSUMER_REF = 
new AtomicReference<>();
+    private final AtomicBoolean isTaskScheduled = new AtomicBoolean(false);
+    private final AtomicBoolean isClosing = new AtomicBoolean(false);
+    private static volatile Instant lastUsedTime = Instant.now();
+
+
+    private static final ScheduledExecutorService SCHEDULER =
+            Executors.newSingleThreadScheduledExecutor();
+
+    public AutoCloseConsumerWrapper() {
+        startIdleCheckTask();
+    }
+
+
+    public DefaultMQPullConsumer getConsumer(RPCHook rpcHook,Boolean useTLS)  {
+        lastUsedTime = Instant.now();
+
+        DefaultMQPullConsumer consumer = CONSUMER_REF.get();
+        if (consumer == null) {
+            synchronized (this) {
+                consumer = CONSUMER_REF.get();
+                if (consumer == null) {
+                    consumer = createNewConsumer(rpcHook,useTLS);
+                    CONSUMER_REF.set(consumer);
+                }
+                try {
+                    consumer.start();
+                } catch (MQClientException e) {
+                    consumer.shutdown();
+                    CONSUMER_REF.set(null);
+                    throw new RuntimeException("Failed to start consumer", e);
+
+                }
+            }
+        }
+        return consumer;
+    }
+
+
+    protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook, Boolean 
useTLS) {
+        return new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook) 
{{
+            setUseTLS(useTLS);
+        }};
+    }
+
+    private void startIdleCheckTask() {
+        if (!isTaskScheduled.get()) {
+            synchronized (this) {
+                if (!isTaskScheduled.get()) {
+                    SCHEDULER.scheduleWithFixedDelay(() -> {
+                        try {
+                            checkAndCloseIdleConsumer();
+                        } catch (Exception e) {
+                            logger.error("Idle check failed", e);
+                        }
+                    }, 1, 1, TimeUnit.MINUTES);
+
+                    isTaskScheduled.set(true);
+                }
+            }
+        }
+    }
+
+    public void checkAndCloseIdleConsumer() {
+        if (shouldClose()) {
+            synchronized (this) {
+                if (shouldClose()) {
+                    close();
+                }
+            }
+        }
+    }
+
+    private boolean shouldClose() {
+        long idleTimeoutMs = 60_000;
+        return CONSUMER_REF.get() != null &&
+                Duration.between(lastUsedTime, Instant.now()).toMillis() > 
idleTimeoutMs;
+    }
+
+
+    public void close() {
+        if (isClosing.compareAndSet(false, true)) {
+            try {
+                DefaultMQPullConsumer consumer = CONSUMER_REF.getAndSet(null);
+                if (consumer != null) {
+                    consumer.shutdown();
+                }
+                isTaskScheduled.set(false);
+            } finally {
+                isClosing.set(false);
+            }
+        }
+    }
+
+}
diff --git 
a/src/test/java/org/apache/rocketmq/dashboard/util/AutoCloseConsumerWrapperTests.java
 
b/src/test/java/org/apache/rocketmq/dashboard/util/AutoCloseConsumerWrapperTests.java
new file mode 100644
index 0000000..ddd1533
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/dashboard/util/AutoCloseConsumerWrapperTests.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.dashboard.util;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper;
+import org.apache.rocketmq.remoting.RPCHook;
+import java.lang.reflect.Field;
+import static org.mockito.Mockito.mock;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import java.time.Instant;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class AutoCloseConsumerWrapperTests {
+
+    private static class TestableWrapper extends AutoCloseConsumerWrapper {
+        private DefaultMQPullConsumer mockConsumer = 
mock(DefaultMQPullConsumer.class);
+
+        @Override
+        protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook, 
Boolean useTLS) {
+            return mockConsumer;
+        }
+    }
+
+    @Test
+    void shouldReuseConsumerInstance() throws Exception {
+        TestableWrapper wrapper = new TestableWrapper();
+
+        DefaultMQPullConsumer first = wrapper.getConsumer(mock(RPCHook.class), 
true);
+        assertNotNull(first);
+
+        DefaultMQPullConsumer second = 
wrapper.getConsumer(mock(RPCHook.class), true);
+        assertSame(first, second);
+    }
+
+    @Test
+    void shouldHandleStartFailure() throws Exception {
+        TestableWrapper wrapper = new TestableWrapper();
+        doThrow(new MQClientException("Simulated error", null))
+                .when(wrapper.mockConsumer).start();
+
+        assertThrows(RuntimeException.class, () ->
+                wrapper.getConsumer(mock(RPCHook.class), true));
+
+        verify(wrapper.mockConsumer).shutdown();
+    }
+
+
+
+    @Test
+    void shouldCloseIdleConsumer() throws Exception {
+        TestableWrapper wrapper = new TestableWrapper();
+
+        wrapper.getConsumer(mock(RPCHook.class), true);
+
+        Field lastUsedTime = 
AutoCloseConsumerWrapper.class.getDeclaredField("lastUsedTime");
+        lastUsedTime.setAccessible(true);
+        lastUsedTime.set(wrapper, Instant.now().minusSeconds(70));
+
+        wrapper.checkAndCloseIdleConsumer();
+
+        verify(wrapper.mockConsumer).shutdown();
+    }
+}

Reply via email to