This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 06e22b4b42 [ISSUE #7765] Fix unit test testEstimateLag 06e22b4b42 is described below commit 06e22b4b423bafd8e3f46d555e659fb72370e4f3 Author: landonchan90 <150651782+landoncha...@users.noreply.github.com> AuthorDate: Wed Jan 31 17:39:30 2024 +0800 [ISSUE #7765] Fix unit test testEstimateLag Co-authored-by: landonchan90 <landaonc...@163.com> --- .../java/org/apache/rocketmq/test/offset/LagCalculationIT.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java b/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java index 0be18a9d33..ad521440e9 100644 --- a/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -171,6 +172,13 @@ public class LagCalculationIT extends BaseConf { RMQSqlConsumer sqlConsumer = ConsumerFactory.getRMQSqlConsumer(NAMESRV_ADDR, initConsumerGroup(), topic, selector, sqlListener); RMQBlockListener tagListener = new RMQBlockListener(true); RMQNormalConsumer tagConsumer = getConsumer(NAMESRV_ADDR, topic, tag, tagListener); + + //init subscriptionData & consumerFilterData for sql + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, sql, ExpressionType.SQL92); + for (BrokerController controller : brokerControllerList) { + controller.getConsumerFilterManager().register(topic, sqlConsumer.getConsumerGroup(), sql, ExpressionType.SQL92, subscriptionData.getSubVersion()); + } + // wait for building filter data await().atMost(5, TimeUnit.SECONDS).until(() -> sqlListener.isBlocked() && tagListener.isBlocked()); @@ -210,7 +218,6 @@ public class LagCalculationIT extends BaseConf { for (MessageQueue mq : mqs) { if (mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) { long brokerOffset = controller.getMessageStore().getMaxOffsetInQueue(topic, mq.getQueueId()); - SubscriptionData subscriptionData = controller.getConsumerManager().findSubscriptionData(sqlConsumer.getConsumerGroup(), topic); ConsumerFilterData consumerFilterData = controller.getConsumerFilterManager().get(topic, sqlConsumer.getConsumerGroup()); long estimateMessageCount = controller.getMessageStore() .estimateMessageCount(topic, mq.getQueueId(), 0, brokerOffset,