This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 06e22b4b42 [ISSUE #7765] Fix unit test testEstimateLag
06e22b4b42 is described below

commit 06e22b4b423bafd8e3f46d555e659fb72370e4f3
Author: landonchan90 <150651782+landoncha...@users.noreply.github.com>
AuthorDate: Wed Jan 31 17:39:30 2024 +0800

    [ISSUE #7765] Fix unit test testEstimateLag
    
    Co-authored-by: landonchan90 <landaonc...@163.com>
---
 .../java/org/apache/rocketmq/test/offset/LagCalculationIT.java   | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java 
b/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
index 0be18a9d33..ad521440e9 100644
--- a/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -171,6 +172,13 @@ public class LagCalculationIT extends BaseConf {
         RMQSqlConsumer sqlConsumer = 
ConsumerFactory.getRMQSqlConsumer(NAMESRV_ADDR, initConsumerGroup(), topic, 
selector, sqlListener);
         RMQBlockListener tagListener = new RMQBlockListener(true);
         RMQNormalConsumer tagConsumer = getConsumer(NAMESRV_ADDR, topic, tag, 
tagListener);
+
+        //init subscriptionData & consumerFilterData for sql
+        SubscriptionData subscriptionData = 
FilterAPI.buildSubscriptionData(topic, sql, ExpressionType.SQL92);
+        for (BrokerController controller : brokerControllerList) {
+            controller.getConsumerFilterManager().register(topic, 
sqlConsumer.getConsumerGroup(), sql, ExpressionType.SQL92, 
subscriptionData.getSubVersion());
+        }
+
         // wait for building filter data
         await().atMost(5, TimeUnit.SECONDS).until(() -> 
sqlListener.isBlocked() && tagListener.isBlocked());
 
@@ -210,7 +218,6 @@ public class LagCalculationIT extends BaseConf {
             for (MessageQueue mq : mqs) {
                 if 
(mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) {
                     long brokerOffset = 
controller.getMessageStore().getMaxOffsetInQueue(topic, mq.getQueueId());
-                    SubscriptionData subscriptionData = 
controller.getConsumerManager().findSubscriptionData(sqlConsumer.getConsumerGroup(),
 topic);
                     ConsumerFilterData consumerFilterData = 
controller.getConsumerFilterManager().get(topic, 
sqlConsumer.getConsumerGroup());
                     long estimateMessageCount = controller.getMessageStore()
                         .estimateMessageCount(topic, mq.getQueueId(), 0, 
brokerOffset,

Reply via email to