This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch refactor in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git
The following commit(s) were added to refs/heads/refactor by this push: new de152dd pref: optimize the response speed of the query api (#273) de152dd is described below commit de152dd6f3ae81136154a0fb1547cb83ce8be4f9 Author: Xu Yichi <110229037+crazylyc...@users.noreply.github.com> AuthorDate: Thu Mar 27 12:22:50 2025 +0800 pref: optimize the response speed of the query api (#273) --- .../dashboard/service/impl/MessageServiceImpl.java | 19 ++- .../dashboard/service/impl/TopicServiceImpl.java | 40 +++++-- .../support/AutoCloseConsumerWrapper.java | 132 +++++++++++++++++++++ .../util/AutoCloseConsumerWrapperTests.java | 84 +++++++++++++ 4 files changed, 256 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java index 16d0d4e..0586447 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper; import org.apache.rocketmq.remoting.protocol.body.Connection; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; @@ -127,11 +128,11 @@ public class MessageServiceImpl implements MessageService { if (isEnableAcl) { rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); } - DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS()); + AutoCloseConsumerWrapper consumerWrapper = new AutoCloseConsumerWrapper(); + DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, configure.isUseTLS()); List<MessageView> messageViewList = Lists.newArrayList(); try { String subExpression = "*"; - consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic); for (MessageQueue mq : mqs) { long minOffset = consumer.searchOffset(mq, begin); @@ -188,8 +189,6 @@ public class MessageServiceImpl implements MessageService { } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); - } finally { - consumer.shutdown(); } } @@ -263,7 +262,8 @@ public class MessageServiceImpl implements MessageService { if (isEnableAcl) { rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); } - DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS()); + AutoCloseConsumerWrapper consumerWrapper = new AutoCloseConsumerWrapper(); + DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, configure.isUseTLS()); long total = 0; List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>(); @@ -271,7 +271,6 @@ public class MessageServiceImpl implements MessageService { List<MessageView> messageViews = new ArrayList<>(); try { - consumer.start(); Collection<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(query.getTopic()); int idx = 0; for (MessageQueue messageQueue : messageQueues) { @@ -394,8 +393,6 @@ public class MessageServiceImpl implements MessageService { } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); - } finally { - consumer.shutdown(); } } @@ -405,14 +402,14 @@ public class MessageServiceImpl implements MessageService { if (isEnableAcl) { rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); } - DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook, configure.isUseTLS()); + AutoCloseConsumerWrapper consumerWrapper = new AutoCloseConsumerWrapper(); + DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook, configure.isUseTLS()); List<MessageView> messageViews = new ArrayList<>(); long offset = query.getPageNum() * query.getPageSize(); long total = 0; try { - consumer.start(); for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) { long start = queueOffsetInfo.getStart(); long end = queueOffsetInfo.getEnd(); @@ -462,8 +459,6 @@ public class MessageServiceImpl implements MessageService { } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); - } finally { - consumer.shutdown(); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java index 4f34fc6..3f4bf9a 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java @@ -73,6 +73,10 @@ import static org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTR @Service public class TopicServiceImpl extends AbstractCommonService implements TopicService { + private transient DefaultMQProducer systemTopicProducer; + + private final Object producerLock = new Object(); + @Autowired private RMQConfigure configure; @@ -297,18 +301,40 @@ public class TopicServiceImpl extends AbstractCommonService implements TopicServ if (isEnableAcl) { rpcHook = new AclClientRPCHook(new SessionCredentials(configure.getAccessKey(), configure.getSecretKey())); } - DefaultMQProducer producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook); - producer.setInstanceName(String.valueOf(System.currentTimeMillis())); - producer.setNamesrvAddr(configure.getNamesrvAddr()); + + // ensures thread safety + if (systemTopicProducer == null) { + synchronized (producerLock) { + if (systemTopicProducer == null) { + systemTopicProducer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook); + systemTopicProducer.setInstanceName("SystemTopicProducer-" + System.currentTimeMillis()); + systemTopicProducer.setNamesrvAddr(configure.getNamesrvAddr()); + try { + systemTopicProducer.start(); + } catch (Exception e) { + systemTopicProducer = null; + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + } + } try { - producer.start(); - return producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L); + return systemTopicProducer.getDefaultMQProducerImpl() + .getmQClientFactory() + .getMQClientAPIImpl() + .getSystemTopicList(20000L); } catch (Exception e) { + // If the call fails, close and clean up the producer, and it will be re-created next time. + synchronized (producerLock) { + if (systemTopicProducer != null) { + systemTopicProducer.shutdown(); + systemTopicProducer = null; + } + } Throwables.throwIfUnchecked(e); throw new RuntimeException(e); - } finally { - producer.shutdown(); } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/support/AutoCloseConsumerWrapper.java b/src/main/java/org/apache/rocketmq/dashboard/support/AutoCloseConsumerWrapper.java new file mode 100644 index 0000000..6f2b52e --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/support/AutoCloseConsumerWrapper.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.dashboard.support; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.RPCHook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class AutoCloseConsumerWrapper { + + private final Logger logger = LoggerFactory.getLogger(GlobalRestfulResponseBodyAdvice.class); + + private static final AtomicReference<DefaultMQPullConsumer> CONSUMER_REF = new AtomicReference<>(); + private final AtomicBoolean isTaskScheduled = new AtomicBoolean(false); + private final AtomicBoolean isClosing = new AtomicBoolean(false); + private static volatile Instant lastUsedTime = Instant.now(); + + + private static final ScheduledExecutorService SCHEDULER = + Executors.newSingleThreadScheduledExecutor(); + + public AutoCloseConsumerWrapper() { + startIdleCheckTask(); + } + + + public DefaultMQPullConsumer getConsumer(RPCHook rpcHook,Boolean useTLS) { + lastUsedTime = Instant.now(); + + DefaultMQPullConsumer consumer = CONSUMER_REF.get(); + if (consumer == null) { + synchronized (this) { + consumer = CONSUMER_REF.get(); + if (consumer == null) { + consumer = createNewConsumer(rpcHook,useTLS); + CONSUMER_REF.set(consumer); + } + try { + consumer.start(); + } catch (MQClientException e) { + consumer.shutdown(); + CONSUMER_REF.set(null); + throw new RuntimeException("Failed to start consumer", e); + + } + } + } + return consumer; + } + + + protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook, Boolean useTLS) { + return new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook) {{ + setUseTLS(useTLS); + }}; + } + + private void startIdleCheckTask() { + if (!isTaskScheduled.get()) { + synchronized (this) { + if (!isTaskScheduled.get()) { + SCHEDULER.scheduleWithFixedDelay(() -> { + try { + checkAndCloseIdleConsumer(); + } catch (Exception e) { + logger.error("Idle check failed", e); + } + }, 1, 1, TimeUnit.MINUTES); + + isTaskScheduled.set(true); + } + } + } + } + + public void checkAndCloseIdleConsumer() { + if (shouldClose()) { + synchronized (this) { + if (shouldClose()) { + close(); + } + } + } + } + + private boolean shouldClose() { + long idleTimeoutMs = 60_000; + return CONSUMER_REF.get() != null && + Duration.between(lastUsedTime, Instant.now()).toMillis() > idleTimeoutMs; + } + + + public void close() { + if (isClosing.compareAndSet(false, true)) { + try { + DefaultMQPullConsumer consumer = CONSUMER_REF.getAndSet(null); + if (consumer != null) { + consumer.shutdown(); + } + isTaskScheduled.set(false); + } finally { + isClosing.set(false); + } + } + } + +} diff --git a/src/test/java/org/apache/rocketmq/dashboard/util/AutoCloseConsumerWrapperTests.java b/src/test/java/org/apache/rocketmq/dashboard/util/AutoCloseConsumerWrapperTests.java new file mode 100644 index 0000000..ddd1533 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/dashboard/util/AutoCloseConsumerWrapperTests.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.dashboard.util; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper; +import org.apache.rocketmq.remoting.RPCHook; +import java.lang.reflect.Field; +import static org.mockito.Mockito.mock; +import org.apache.rocketmq.client.exception.MQClientException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import java.time.Instant; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class AutoCloseConsumerWrapperTests { + + private static class TestableWrapper extends AutoCloseConsumerWrapper { + private DefaultMQPullConsumer mockConsumer = mock(DefaultMQPullConsumer.class); + + @Override + protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook, Boolean useTLS) { + return mockConsumer; + } + } + + @Test + void shouldReuseConsumerInstance() throws Exception { + TestableWrapper wrapper = new TestableWrapper(); + + DefaultMQPullConsumer first = wrapper.getConsumer(mock(RPCHook.class), true); + assertNotNull(first); + + DefaultMQPullConsumer second = wrapper.getConsumer(mock(RPCHook.class), true); + assertSame(first, second); + } + + @Test + void shouldHandleStartFailure() throws Exception { + TestableWrapper wrapper = new TestableWrapper(); + doThrow(new MQClientException("Simulated error", null)) + .when(wrapper.mockConsumer).start(); + + assertThrows(RuntimeException.class, () -> + wrapper.getConsumer(mock(RPCHook.class), true)); + + verify(wrapper.mockConsumer).shutdown(); + } + + + + @Test + void shouldCloseIdleConsumer() throws Exception { + TestableWrapper wrapper = new TestableWrapper(); + + wrapper.getConsumer(mock(RPCHook.class), true); + + Field lastUsedTime = AutoCloseConsumerWrapper.class.getDeclaredField("lastUsedTime"); + lastUsedTime.setAccessible(true); + lastUsedTime.set(wrapper, Instant.now().minusSeconds(70)); + + wrapper.checkAndCloseIdleConsumer(); + + verify(wrapper.mockConsumer).shutdown(); + } +}