This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bbd2514 [ISSUE #1060] [Java] Enhance the logic of MessageInterceptor 
(#1061)
6bbd2514 is described below

commit 6bbd2514584c233dbf28eee740385dd5912c10db
Author: bcaw-ofeer <[email protected]>
AuthorDate: Thu Sep 4 10:03:16 2025 +0800

    [ISSUE #1060] [Java] Enhance the logic of MessageInterceptor (#1061)
    
    Co-authored-by: xiaoying.ly <[email protected]>
---
 .../java/hook/CompositedMessageInterceptor.java    | 15 ++++++++---
 .../client/java/impl/consumer/ConsumeTask.java     | 20 +++++++++++++++
 .../client/java/impl/producer/ProducerImpl.java    | 30 ++++++++++++++++++----
 .../client/java/rpc/LoggingInterceptor.java        | 17 ++++++++++++
 .../apache/rocketmq/client/java/tool/TestBase.java |  8 ++++--
 5 files changed, 79 insertions(+), 11 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
index 7b29faa7..99926866 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
@@ -39,11 +39,15 @@ public class CompositedMessageInterceptor implements 
MessageInterceptor {
     @Override
     public void doBefore(MessageInterceptorContext context0, 
List<GeneralMessage> messages) {
         final HashMap<Integer, Map<AttributeKey, Attribute>> attributeMap = 
new HashMap<>();
+        final MessageHookPoints messageHookPoints = 
context0.getMessageHookPoints();
+        final MessageHookPointsStatus status = context0.getStatus();
+        MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(messageHookPoints, status);
+
         for (int index = 0; index < interceptors.size(); index++) {
             MessageInterceptor interceptor = interceptors.get(index);
-            final MessageHookPoints messageHookPoints = 
context0.getMessageHookPoints();
-            final MessageHookPointsStatus status = context0.getStatus();
-            final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(messageHookPoints, status);
+            if (context0 instanceof MessageInterceptorContextImpl) {
+                ((MessageInterceptorContextImpl) 
context0).getAttributes().forEach(context::putAttribute);
+            }
             try {
                 interceptor.doBefore(context, messages);
             } catch (Throwable t) {
@@ -63,8 +67,11 @@ public class CompositedMessageInterceptor implements 
MessageInterceptor {
             final Map<AttributeKey, Attribute> attributes = 
attributeMap.get(index);
             final MessageHookPoints messageHookPoints = 
context0.getMessageHookPoints();
             final MessageHookPointsStatus status = context0.getStatus();
-            final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(messageHookPoints, status,
+            MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(messageHookPoints, status,
                 attributes);
+            if (context0 instanceof MessageInterceptorContextImpl) {
+                ((MessageInterceptorContextImpl) 
context0).getAttributes().forEach(context::putAttribute);
+            }
             MessageInterceptor interceptor = interceptors.get(index);
             try {
                 interceptor.doAfter(context, messages);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
index b1f5c7f2..19cea3e0 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.java.hook.Attribute;
+import org.apache.rocketmq.client.java.hook.AttributeKey;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
@@ -30,10 +32,15 @@ import 
org.apache.rocketmq.client.java.message.GeneralMessage;
 import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.ClientId;
+import org.apache.rocketmq.client.java.rpc.LoggingInterceptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ConsumeTask implements Callable<ConsumeResult> {
+    static final AttributeKey<MessageViewImpl> MESSAGE_VIEW_CONTEXT_KEY = 
AttributeKey.create("messageView");
+    static final AttributeKey<String> REMOTE_ADDR_CONTEXT_KEY = 
AttributeKey.create("remoteAddr");
+    static final AttributeKey<Throwable> CONSUME_ERROR_CONTEXT_KEY = 
AttributeKey.create("consumeError");
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumeTask.class);
 
     private final ClientId clientId;
@@ -59,7 +66,15 @@ public class ConsumeTask implements Callable<ConsumeResult> {
         ConsumeResult consumeResult;
         final List<GeneralMessage> generalMessages = 
Collections.singletonList(new GeneralMessageImpl(messageView));
         MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.CONSUME);
+
+        // Add remoteAddr to context.
+        String remoteAddr = LoggingInterceptor.getInstance().getRemoteAddr();
+        context.putAttribute(REMOTE_ADDR_CONTEXT_KEY, 
Attribute.create(remoteAddr));
+        // Add message view to context.
+        context.putAttribute(MESSAGE_VIEW_CONTEXT_KEY, 
Attribute.create(messageView));
+
         messageInterceptor.doBefore(context, generalMessages);
+        Throwable throwable = null;
         try {
             consumeResult = messageListener.consume(messageView);
         } catch (Throwable t) {
@@ -67,10 +82,15 @@ public class ConsumeTask implements Callable<ConsumeResult> 
{
                 "messageId={}", clientId, messageView.getMessageQueue(), 
messageView.getMessageId(), t);
             // If exception was thrown during the period of message 
consumption, mark it as failure.
             consumeResult = ConsumeResult.FAILURE;
+            throwable = t;
         }
         MessageHookPointsStatus status = 
ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK :
             MessageHookPointsStatus.ERROR;
         context = new MessageInterceptorContextImpl(context, status);
+        if (!ConsumeResult.SUCCESS.equals(consumeResult) && null != throwable) 
{
+            // Add consume error to context.
+            context.putAttribute(CONSUME_ERROR_CONTEXT_KEY, 
Attribute.create(throwable));
+        }
         messageInterceptor.doAfter(context, generalMessages);
         // Make sure that the return value is the subset of messageViews.
         return consumeResult;
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 1945639b..7c0ddad1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -66,6 +66,8 @@ import 
org.apache.rocketmq.client.apis.producer.TransactionResolution;
 import org.apache.rocketmq.client.java.exception.InternalErrorException;
 import org.apache.rocketmq.client.java.exception.StatusChecker;
 import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.hook.Attribute;
+import org.apache.rocketmq.client.java.hook.AttributeKey;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
@@ -81,6 +83,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteData;
+import org.apache.rocketmq.client.java.rpc.LoggingInterceptor;
 import org.apache.rocketmq.client.java.rpc.RpcFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,6 +95,11 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
 class ProducerImpl extends ClientImpl implements Producer {
+    static final AttributeKey<String>  REMOTE_ADDR_CONTEXT_KEY = 
AttributeKey.create("remoteAddr");
+    static final AttributeKey<List<SendReceiptImpl>> SEND_RECEIPTS_CONTEXT_KEY 
= AttributeKey.create("sendReceipts");
+    static final AttributeKey<Throwable> SEND_EXCEPTION_CONTEXT_KEY = 
AttributeKey.create("sendException");
+    static final AttributeKey<MessageType> MESSAGE_TYPE_CONTEXT_KEY = 
AttributeKey.create("messageType");
+
     private static final Logger log = 
LoggerFactory.getLogger(ProducerImpl.class);
 
     protected final PublishingSettings publishingSettings;
@@ -474,7 +482,14 @@ class ProducerImpl extends ClientImpl implements Producer {
         // Intercept before message publishing.
         final List<GeneralMessage> generalMessages = 
messages.stream().map((Function<PublishingMessageImpl,
             GeneralMessage>) 
GeneralMessageImpl::new).collect(Collectors.toList());
-        final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.SEND);
+        MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.SEND);
+
+        // Add message type to context.
+        context.putAttribute(MESSAGE_TYPE_CONTEXT_KEY, 
Attribute.create(messageType));
+        // Add remoteAddr to context.
+        String remoteAddr = LoggingInterceptor.getInstance().getRemoteAddr();
+        context.putAttribute(REMOTE_ADDR_CONTEXT_KEY, 
Attribute.create(remoteAddr));
+
         doBefore(context, generalMessages);
 
         Futures.addCallback(future, new 
FutureCallback<List<SendReceiptImpl>>() {
@@ -488,15 +503,18 @@ class ProducerImpl extends ClientImpl implements Producer 
{
                     future0.setException(e);
 
                     // Intercept after message publishing.
-                    final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
+                    MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                         MessageHookPointsStatus.ERROR);
+                    // Add send exception to context.
+                    context0.putAttribute(SEND_EXCEPTION_CONTEXT_KEY, 
Attribute.create(e));
                     doAfter(context0, generalMessages);
-
                     return;
                 }
                 // Intercept after message publishing.
-                final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
+                MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                     MessageHookPointsStatus.OK);
+                // Add send receipts to context.
+                context0.putAttribute(SEND_RECEIPTS_CONTEXT_KEY, 
Attribute.create(sendReceipts));
                 doAfter(context0, generalMessages);
 
                 // No need more attempts.
@@ -518,8 +536,10 @@ class ProducerImpl extends ClientImpl implements Producer {
             @Override
             public void onFailure(Throwable t) {
                 // Intercept after message publishing.
-                final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
+                MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                     MessageHookPointsStatus.ERROR);
+                // Add send exception to context.
+                context0.putAttribute(SEND_EXCEPTION_CONTEXT_KEY, 
Attribute.create(t));
                 doAfter(context0, generalMessages);
 
                 // Collect messageId(s) for logging.
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
index 45261401..57a61252 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
@@ -17,12 +17,14 @@
 
 package org.apache.rocketmq.client.java.rpc;
 
+import io.grpc.Attributes;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
 import io.grpc.ClientInterceptor;
 import io.grpc.ForwardingClientCall;
 import io.grpc.ForwardingClientCallListener;
+import io.grpc.Grpc;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
 import java.util.UUID;
@@ -36,6 +38,11 @@ public class LoggingInterceptor implements ClientInterceptor 
{
     private static final Logger log = 
LoggerFactory.getLogger(LoggingInterceptor.class);
 
     private static final LoggingInterceptor INSTANCE = new 
LoggingInterceptor();
+    private String remoteAddr = "";
+
+    public String getRemoteAddr() {
+        return remoteAddr;
+    }
 
     public static LoggingInterceptor getInstance() {
         return INSTANCE;
@@ -70,6 +77,16 @@ public class LoggingInterceptor implements ClientInterceptor 
{
                                 + "authority={}, headers={}", rpcId, 
serviceName, methodName, authority, headers);
                             super.onHeaders(headers);
                         }
+
+                        @Override
+                        public void onReady() {
+                            Attributes attributes = getAttributes();
+                            Object address = 
attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
+                            String remoteAddrStr = address != null ? 
address.toString() : "";
+                            remoteAddr = remoteAddrStr.startsWith("/") ? 
remoteAddrStr.substring(1) : remoteAddrStr;
+                            super.onReady();
+                        }
+
                     };
                 super.start(observabilityListener, headers);
             }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 8b74d04b..7f80f155 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -232,8 +232,12 @@ public class TestBase {
     }
 
     protected MessageQueue fakePbMessageQueue0(Resource topicResource) {
-        return 
MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker0())
-            .setPermission(Permission.READ_WRITE).build();
+        return MessageQueue.newBuilder()
+            .setTopic(topicResource)
+            .setBroker(fakePbBroker0())
+            .setPermission(Permission.READ_WRITE)
+            .addAcceptMessageTypes(MessageType.NORMAL)
+            .build();
     }
 
     protected MessageQueue fakePbMessageQueue1(Resource topicResource) {

Reply via email to