This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 316d32a00 [ISSUE #3804]Commit consumption offset with specific
MessageQueue. (#3838)
316d32a00 is described below
commit 316d32a006ea49ed806aa965890c917020c426c2
Author: Ni Ze <[email protected]>
AuthorDate: Tue May 24 09:47:27 2022 +0800
[ISSUE #3804]Commit consumption offset with specific MessageQueue. (#3838)
* feat(client) commit consumption offset with specific MessageQueue.
* feat(client) remove exception.
* feat(client) add test
* fix(test) code style is not right
* doc(broker.conf) remove item for test
---
.../client/consumer/DefaultLitePullConsumer.java | 6 +++
.../rocketmq/client/consumer/LitePullConsumer.java | 10 +++--
.../impl/consumer/DefaultLitePullConsumerImpl.java | 44 +++++++++++++------
.../consumer/DefaultLitePullConsumerTest.java | 50 +++++++++++++++++-----
.../rocketmq/example/rpc/RequestProducer.java | 2 +
.../rocketmq/example/rpc/ResponseConsumer.java | 4 +-
6 files changed, 89 insertions(+), 27 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 440e37e49..8c7f0f0b3 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
@@ -315,6 +316,11 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
this.defaultLitePullConsumerImpl.commitAll();
}
+ @Override
+ public void commit(final Set<MessageQueue> messageQueues, boolean persist)
{
+ this.defaultLitePullConsumerImpl.commit(messageQueues, persist);
+ }
+
@Override
public Long committed(MessageQueue messageQueue) throws MQClientException {
return
this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue));
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index 25b11046f..089df516f 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -16,13 +16,14 @@
*/
package org.apache.rocketmq.client.consumer;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
public interface LitePullConsumer {
/**
@@ -160,6 +161,9 @@ public interface LitePullConsumer {
*/
void commitSync();
+
+ void commit(final Set<MessageQueue> messageQueues, boolean persist);
+
/**
* Get the last committed offset for the given message queue.
*
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index d5157501b..11cdbf6fb 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -645,21 +645,39 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
public synchronized void commitAll() {
- try {
- for (MessageQueue messageQueue :
assignedMessageQueue.messageQueues()) {
- long consumerOffset =
assignedMessageQueue.getConsumerOffset(messageQueue);
- if (consumerOffset != -1) {
- ProcessQueue processQueue =
assignedMessageQueue.getProcessQueue(messageQueue);
- if (processQueue != null && !processQueue.isDropped()) {
- updateConsumeOffset(messageQueue, consumerOffset);
- }
- }
+ for (MessageQueue messageQueue : assignedMessageQueue.messageQueues())
{
+ try {
+ commit(messageQueue);
+ } catch (Exception e) {
+ log.error("An error occurred when update consume offset
Automatically.");
}
- if (defaultLitePullConsumer.getMessageModel() ==
MessageModel.BROADCASTING) {
- offsetStore.persistAll(assignedMessageQueue.messageQueues());
+ }
+ }
+
+ public synchronized void commit(final Set<MessageQueue> messageQueues,
boolean persist) {
+ if (messageQueues == null || messageQueues.size() == 0) {
+ return;
+ }
+
+ for (MessageQueue messageQueue : messageQueues) {
+ commit(messageQueue);
+ }
+
+ if (persist) {
+ this.offsetStore.persistAll(messageQueues);
+ }
+ }
+
+ private synchronized void commit(MessageQueue messageQueue) {
+ long consumerOffset =
assignedMessageQueue.getConsumerOffset(messageQueue);
+
+ if (consumerOffset != -1) {
+ ProcessQueue processQueue =
assignedMessageQueue.getProcessQueue(messageQueue);
+ if (processQueue != null && !processQueue.isDropped()) {
+ updateConsumeOffset(messageQueue, consumerOffset);
}
- } catch (Exception e) {
- log.error("An error occurred when update consume offset
Automatically.");
+ } else {
+ log.error("consumerOffset is -1 in messageQueue [" + messageQueue
+ "].");
}
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 227ea4417..c452f30ff 100644
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -17,19 +17,11 @@
package org.apache.rocketmq.client.consumer;
-import java.io.ByteArrayOutputStream;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
@@ -61,6 +53,16 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.assertEquals;
@@ -374,7 +376,8 @@ public class DefaultLitePullConsumerTest {
doReturn(Collections.emptySet()).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
litePullConsumer.setTopicMetadataCheckIntervalMillis(10);
litePullConsumer.registerTopicMessageQueueChangeListener(topic, new
TopicMessageQueueChangeListener() {
- @Override public void onChanged(String topic, Set<MessageQueue>
messageQueues) {
+ @Override
+ public void onChanged(String topic, Set<MessageQueue>
messageQueues) {
flag = true;
}
});
@@ -537,6 +540,33 @@ public class DefaultLitePullConsumerTest {
assertThat(defaultLitePullConsumer.isRunning()).isFalse();
}
+ @Test
+ public void testConsumerCommitWithMQ() throws Exception {
+ DefaultLitePullConsumer litePullConsumer =
createNotStartLitePullConsumer();
+ RemoteBrokerOffsetStore store = new
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
+ litePullConsumer.setOffsetStore(store);
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
+
+ //replace with real offsetStore.
+ Field offsetStore =
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
+ offsetStore.setAccessible(true);
+ offsetStore.set(litePullConsumerImpl, store);
+
+ MessageQueue messageQueue = createMessageQueue();
+ HashSet<MessageQueue> set = new HashSet<MessageQueue>();
+ set.add(messageQueue);
+
+ //mock assign and reset offset
+ litePullConsumer.assign(set);
+ litePullConsumer.seek(messageQueue, 0);
+
+ //commit
+ litePullConsumer.commit(set, true);
+
+ assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0);
+ }
+
static class AsyncConsumer {
public void executeAsync(final DefaultLitePullConsumer consumer) {
new Thread(new Runnable() {
diff --git
a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
index b34908b84..f200f77da 100644
--- a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
@@ -29,6 +29,8 @@ public class RequestProducer {
long ttl = 3000;
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+ producer.setNamesrvAddr("127.0.0.1:9876");
+
producer.start();
try {
diff --git
a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
index c62c7d4eb..421297cc3 100644
---
a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
@@ -40,11 +40,13 @@ public class ResponseConsumer {
// create a producer to send reply message
DefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);
+ replyProducer.setNamesrvAddr("127.0.0.1:9876");
replyProducer.start();
// create consumer
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer(consumerGroup);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ consumer.setNamesrvAddr("127.0.0.1:9876");
// recommend client configs
consumer.setPullTimeDelayMillsWhenException(0L);
@@ -55,7 +57,7 @@ public class ResponseConsumer {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
for (MessageExt msg : msgs) {
try {
- System.out.printf("handle message: %s",
msg.toString());
+ System.out.printf("handle message: %s %n",
msg.toString());
String replyTo = MessageUtil.getReplyToClient(msg);
byte[] replyContent = "reply message
contents.".getBytes();
// create reply message with given util, do not create
reply message by yourself