This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 8905f83d [ISSUE #1081] [Java] Support message filtering in
interceptors with proper acknowledgment handling (#1082)
8905f83d is described below
commit 8905f83daf6f9cb26e1d283d86239ff907d2527d
Author: bcaw-ofeer <[email protected]>
AuthorDate: Thu Aug 28 16:37:30 2025 +0800
[ISSUE #1081] [Java] Support message filtering in interceptors with proper
acknowledgment handling (#1082)
Co-authored-by: xiaoying.ly <[email protected]>
Co-authored-by: terrance.lzm <[email protected]>
---
.../client/apis/consumer/PushConsumerBuilder.java | 9 +++
.../java/impl/consumer/ProcessQueueImpl.java | 64 +++++++++++++++++++---
.../impl/consumer/PushConsumerBuilderImpl.java | 11 +++-
.../java/impl/consumer/PushConsumerImpl.java | 16 +++++-
.../java/impl/consumer/ReceiveMessageResult.java | 5 ++
protos | 1 -
6 files changed, 96 insertions(+), 10 deletions(-)
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
index 73677a7d..93f4c67b 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java
@@ -90,6 +90,15 @@ public interface PushConsumerBuilder {
*/
PushConsumerBuilder setEnableFifoConsumeAccelerator(boolean
enableFifoConsumeAccelerator);
+ /**
+ * Enable or disable message interceptor filtering functionality.
+ * When enabled, it supports client-side message filtering by message
interceptors.
+ *
+ * @param enableMessageInterceptorFiltering whether to enable message
interceptor filtering
+ * @return the consumer builder instance.
+ */
+ PushConsumerBuilder setEnableMessageInterceptorFiltering(boolean
enableMessageInterceptorFiltering);
+
/**
* Finalize the build of {@link PushConsumer} and start.
*
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 7a1fc404..6e9146db 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -37,6 +37,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -256,13 +257,62 @@ class ProcessQueueImpl implements ProcessQueue {
new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.OK);
consumer.doAfter(context0, generalMessages);
- try {
- onReceiveMessageResult(result);
- } catch (Throwable t) {
- // Should never reach here.
- log.error("[Bug] Exception raised while handling
receive result, mq={}, endpoints={}, "
- + "clientId={}", mq, endpoints, clientId, t);
- onReceiveMessageException(t, attemptId);
+ // Only perform message filtering when
enableMessageInterceptorFiltering is enabled.
+ if (consumer.isEnableMessageInterceptorFiltering()) {
+ final List<MessageViewImpl> originalMessages =
+ new ArrayList<>(result.getMessageViewImpls());
+
+ final Set<MessageId> filteredMessageIds =
generalMessages.stream()
+ .filter(msg -> msg.getMessageId().isPresent())
+ .map(msg -> msg.getMessageId().get())
+ .collect(Collectors.toSet());
+
+ final List<MessageViewImpl> filteredOutMessages =
new ArrayList<>();
+ final List<MessageViewImpl> remainingMessages =
new ArrayList<>();
+
+ for (MessageViewImpl originalMsg :
originalMessages) {
+ if
(filteredMessageIds.contains(originalMsg.getMessageId())) {
+ remainingMessages.add(originalMsg);
+ } else {
+ filteredOutMessages.add(originalMsg);
+ }
+ }
+
+ // Ack filtered out messages.
+ if (!filteredOutMessages.isEmpty()) {
+ log.info("Acking {} filtered out messages by
interceptor, mq={}, clientId={}",
+ filteredOutMessages.size(), mq,
consumer.getClientId());
+
+ for (MessageViewImpl filteredOutMsg :
filteredOutMessages) {
+ ListenableFuture<Void> ackFuture =
ackMessage(filteredOutMsg);
+ ackFuture.addListener(() -> {
+ log.debug("Successfully acked filtered
out message, messageId={}, topic={}",
+ filteredOutMsg.getMessageId(),
filteredOutMsg.getTopic());
+ }, MoreExecutors.directExecutor());
+ }
+ }
+
+ try {
+ // Create new ReceiveMessageResult with
filtered messages.
+ ReceiveMessageResult filteredResult =
+
ReceiveMessageResult.createFilteredResult(result, remainingMessages);
+ onReceiveMessageResult(filteredResult);
+ } catch (Throwable t) {
+ // Should never reach here.
+ log.error("[Bug] Exception raised while
handling receive result, mq={}, endpoints={}, "
+ + "clientId={}", mq, endpoints, clientId,
t);
+ onReceiveMessageException(t, attemptId);
+ }
+ } else {
+ // When filtering is disabled, use original result
directly to avoid performance overhead.
+ try {
+ onReceiveMessageResult(result);
+ } catch (Throwable t) {
+ // Should never reach here.
+ log.error("[Bug] Exception raised while
handling receive result, mq={}, endpoints={}, "
+ + "clientId={}", mq, endpoints, clientId,
t);
+ onReceiveMessageException(t, attemptId);
+ }
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
index b9171c18..0d833bfc 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
@@ -42,6 +42,7 @@ public class PushConsumerBuilderImpl implements
PushConsumerBuilder {
private int maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
private int consumptionThreadCount = 20;
private boolean enableFifoConsumeAccelerator = false;
+ private boolean enableMessageInterceptorFiltering = false;
/**
* @see PushConsumerBuilder#setClientConfiguration(ClientConfiguration)
@@ -123,6 +124,14 @@ public class PushConsumerBuilderImpl implements
PushConsumerBuilder {
return this;
}
+ /**
+ * @see PushConsumerBuilder#setEnableMessageInterceptorFiltering(boolean)
+ */
+ public PushConsumerBuilder setEnableMessageInterceptorFiltering(boolean
enableMessageInterceptorFiltering) {
+ this.enableMessageInterceptorFiltering =
enableMessageInterceptorFiltering;
+ return this;
+ }
+
/**
* @see PushConsumerBuilder#build()
*/
@@ -134,7 +143,7 @@ public class PushConsumerBuilderImpl implements
PushConsumerBuilder {
checkArgument(!subscriptionExpressions.isEmpty(),
"subscriptionExpressions have not been set yet");
final PushConsumerImpl pushConsumer = new
PushConsumerImpl(clientConfiguration, consumerGroup,
subscriptionExpressions, messageListener, maxCacheMessageCount,
maxCacheMessageSizeInBytes,
- consumptionThreadCount, enableFifoConsumeAccelerator);
+ consumptionThreadCount, enableFifoConsumeAccelerator,
enableMessageInterceptorFiltering);
pushConsumer.startAsync().awaitRunning();
return pushConsumer;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 62c756db..c43c3d27 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -104,6 +104,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
private final int maxCacheMessageCount;
private final int maxCacheMessageSizeInBytes;
private final boolean enableFifoConsumeAccelerator;
+ private final boolean enableMessageInterceptorFiltering;
private final InflightRequestCountInterceptor
inflightRequestCountInterceptor;
/**
@@ -129,6 +130,14 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
Map<String, FilterExpression> subscriptionExpressions, MessageListener
messageListener,
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int
consumptionThreadCount,
boolean enableFifoConsumeAccelerator) {
+ this(clientConfiguration, consumerGroup, subscriptionExpressions,
messageListener, maxCacheMessageCount,
+ maxCacheMessageSizeInBytes, consumptionThreadCount,
enableFifoConsumeAccelerator, false);
+ }
+
+ public PushConsumerImpl(ClientConfiguration clientConfiguration, String
consumerGroup,
+ Map<String, FilterExpression> subscriptionExpressions, MessageListener
messageListener,
+ int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int
consumptionThreadCount,
+ boolean enableFifoConsumeAccelerator, boolean
enableMessageInterceptorFiltering) {
super(clientConfiguration, consumerGroup,
subscriptionExpressions.keySet());
this.clientConfiguration = clientConfiguration;
Resource groupResource = new
Resource(clientConfiguration.getNamespace(), consumerGroup);
@@ -141,6 +150,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
this.maxCacheMessageCount = maxCacheMessageCount;
this.maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
this.enableFifoConsumeAccelerator = enableFifoConsumeAccelerator;
+ this.enableMessageInterceptorFiltering =
enableMessageInterceptorFiltering;
this.receptionTimes = new AtomicLong(0);
this.receivedMessagesQuantity = new AtomicLong(0);
@@ -165,7 +175,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
Map<String, FilterExpression> subscriptionExpressions, MessageListener
messageListener,
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int
consumptionThreadCount) {
this(clientConfiguration, consumerGroup, subscriptionExpressions,
messageListener, maxCacheMessageCount,
- maxCacheMessageSizeInBytes, consumptionThreadCount, true);
+ maxCacheMessageSizeInBytes, consumptionThreadCount, true, false);
}
@Override
@@ -626,4 +636,8 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
public ThreadPoolExecutor getConsumptionExecutor() {
return consumptionExecutor;
}
+
+ public boolean isEnableMessageInterceptorFiltering() {
+ return enableMessageInterceptorFiltering;
+ }
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index ab11ff93..0b224858 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -32,6 +32,11 @@ public class ReceiveMessageResult {
this.messages = messages;
}
+ public static ReceiveMessageResult
createFilteredResult(ReceiveMessageResult original,
+ List<MessageViewImpl> filteredMessages) {
+ return new ReceiveMessageResult(original.getEndpoints(), new
ArrayList<>(filteredMessages));
+ }
+
public List<MessageView> getMessageViews() {
return new ArrayList<>(messages);
}
diff --git a/protos b/protos
deleted file mode 160000
index 5c9f8419..00000000
--- a/protos
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 5c9f84199bffa79b2ed73beb37774ca92e749c19