This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 316d32a00 [ISSUE #3804]Commit consumption offset with specific 
MessageQueue. (#3838)
316d32a00 is described below

commit 316d32a006ea49ed806aa965890c917020c426c2
Author: Ni Ze <[email protected]>
AuthorDate: Tue May 24 09:47:27 2022 +0800

    [ISSUE #3804]Commit consumption offset with specific MessageQueue. (#3838)
    
    * feat(client) commit consumption offset with specific MessageQueue.
    
    * feat(client) remove exception.
    
    * feat(client) add test
    
    * fix(test) code style is not right
    
    * doc(broker.conf) remove item for test
---
 .../client/consumer/DefaultLitePullConsumer.java   |  6 +++
 .../rocketmq/client/consumer/LitePullConsumer.java | 10 +++--
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 44 +++++++++++++------
 .../consumer/DefaultLitePullConsumerTest.java      | 50 +++++++++++++++++-----
 .../rocketmq/example/rpc/RequestProducer.java      |  2 +
 .../rocketmq/example/rpc/ResponseConsumer.java     |  4 +-
 6 files changed, 89 insertions(+), 27 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 440e37e49..8c7f0f0b3 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.consumer;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
@@ -315,6 +316,11 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
         this.defaultLitePullConsumerImpl.commitAll();
     }
 
+    @Override
+    public void commit(final Set<MessageQueue> messageQueues, boolean persist) 
{
+        this.defaultLitePullConsumerImpl.commit(messageQueues, persist);
+    }
+
     @Override
     public Long committed(MessageQueue messageQueue) throws MQClientException {
         return 
this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue));
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index 25b11046f..089df516f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -16,13 +16,14 @@
  */
 package org.apache.rocketmq.client.consumer;
 
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
 public interface LitePullConsumer {
 
     /**
@@ -160,6 +161,9 @@ public interface LitePullConsumer {
      */
     void commitSync();
 
+
+    void commit(final Set<MessageQueue> messageQueues, boolean persist);
+
     /**
      * Get the last committed offset for the given message queue.
      *
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index d5157501b..11cdbf6fb 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -645,21 +645,39 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     }
 
     public synchronized void commitAll() {
-        try {
-            for (MessageQueue messageQueue : 
assignedMessageQueue.messageQueues()) {
-                long consumerOffset = 
assignedMessageQueue.getConsumerOffset(messageQueue);
-                if (consumerOffset != -1) {
-                    ProcessQueue processQueue = 
assignedMessageQueue.getProcessQueue(messageQueue);
-                    if (processQueue != null && !processQueue.isDropped()) {
-                        updateConsumeOffset(messageQueue, consumerOffset);
-                    }
-                }
+        for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) 
{
+            try {
+                commit(messageQueue);
+            } catch (Exception e) {
+                log.error("An error occurred when update consume offset 
Automatically.");
             }
-            if (defaultLitePullConsumer.getMessageModel() == 
MessageModel.BROADCASTING) {
-                offsetStore.persistAll(assignedMessageQueue.messageQueues());
+        }
+    }
+
+    public synchronized void commit(final Set<MessageQueue> messageQueues, 
boolean persist) {
+        if (messageQueues == null || messageQueues.size() == 0) {
+            return;
+        }
+
+        for (MessageQueue messageQueue : messageQueues) {
+            commit(messageQueue);
+        }
+
+        if (persist) {
+            this.offsetStore.persistAll(messageQueues);
+        }
+    }
+
+    private synchronized void commit(MessageQueue messageQueue) {
+        long consumerOffset = 
assignedMessageQueue.getConsumerOffset(messageQueue);
+
+        if (consumerOffset != -1) {
+            ProcessQueue processQueue = 
assignedMessageQueue.getProcessQueue(messageQueue);
+            if (processQueue != null && !processQueue.isDropped()) {
+                updateConsumeOffset(messageQueue, consumerOffset);
             }
-        } catch (Exception e) {
-            log.error("An error occurred when update consume offset 
Automatically.");
+        } else {
+            log.error("consumerOffset is -1 in messageQueue [" + messageQueue 
+ "].");
         }
     }
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 227ea4417..c452f30ff 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -17,19 +17,11 @@
 
 package org.apache.rocketmq.client.consumer;
 
-import java.io.ByteArrayOutputStream;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
@@ -61,6 +53,16 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
 import static org.junit.Assert.assertEquals;
@@ -374,7 +376,8 @@ public class DefaultLitePullConsumerTest {
         
doReturn(Collections.emptySet()).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
         litePullConsumer.setTopicMetadataCheckIntervalMillis(10);
         litePullConsumer.registerTopicMessageQueueChangeListener(topic, new 
TopicMessageQueueChangeListener() {
-            @Override public void onChanged(String topic, Set<MessageQueue> 
messageQueues) {
+            @Override
+            public void onChanged(String topic, Set<MessageQueue> 
messageQueues) {
                 flag = true;
             }
         });
@@ -537,6 +540,33 @@ public class DefaultLitePullConsumerTest {
         assertThat(defaultLitePullConsumer.isRunning()).isFalse();
     }
 
+    @Test
+    public void testConsumerCommitWithMQ() throws Exception {
+        DefaultLitePullConsumer litePullConsumer = 
createNotStartLitePullConsumer();
+        RemoteBrokerOffsetStore store = new 
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
+        litePullConsumer.setOffsetStore(store);
+        litePullConsumer.start();
+        initDefaultLitePullConsumer(litePullConsumer);
+
+        //replace with real offsetStore.
+        Field offsetStore = 
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
+        offsetStore.setAccessible(true);
+        offsetStore.set(litePullConsumerImpl, store);
+
+        MessageQueue messageQueue = createMessageQueue();
+        HashSet<MessageQueue> set = new HashSet<MessageQueue>();
+        set.add(messageQueue);
+
+        //mock assign and reset offset
+        litePullConsumer.assign(set);
+        litePullConsumer.seek(messageQueue, 0);
+
+        //commit
+        litePullConsumer.commit(set, true);
+
+        assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0);
+    }
+
     static class AsyncConsumer {
         public void executeAsync(final DefaultLitePullConsumer consumer) {
             new Thread(new Runnable() {
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java 
b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
index b34908b84..f200f77da 100644
--- a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
@@ -29,6 +29,8 @@ public class RequestProducer {
         long ttl = 3000;
 
         DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+        producer.setNamesrvAddr("127.0.0.1:9876");
+
         producer.start();
 
         try {
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java 
b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
index c62c7d4eb..421297cc3 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
@@ -40,11 +40,13 @@ public class ResponseConsumer {
 
         // create a producer to send reply message
         DefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);
+        replyProducer.setNamesrvAddr("127.0.0.1:9876");
         replyProducer.start();
 
         // create consumer
         DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer(consumerGroup);
         
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        consumer.setNamesrvAddr("127.0.0.1:9876");
 
         // recommend client configs
         consumer.setPullTimeDelayMillsWhenException(0L);
@@ -55,7 +57,7 @@ public class ResponseConsumer {
                 System.out.printf("%s Receive New Messages: %s %n", 
Thread.currentThread().getName(), msgs);
                 for (MessageExt msg : msgs) {
                     try {
-                        System.out.printf("handle message: %s", 
msg.toString());
+                        System.out.printf("handle message: %s %n", 
msg.toString());
                         String replyTo = MessageUtil.getReplyToClient(msg);
                         byte[] replyContent = "reply message 
contents.".getBytes();
                         // create reply message with given util, do not create 
reply message by yourself

Reply via email to