This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 6bbd2514 [ISSUE #1060] [Java] Enhance the logic of MessageInterceptor
(#1061)
6bbd2514 is described below
commit 6bbd2514584c233dbf28eee740385dd5912c10db
Author: bcaw-ofeer <[email protected]>
AuthorDate: Thu Sep 4 10:03:16 2025 +0800
[ISSUE #1060] [Java] Enhance the logic of MessageInterceptor (#1061)
Co-authored-by: xiaoying.ly <[email protected]>
---
.../java/hook/CompositedMessageInterceptor.java | 15 ++++++++---
.../client/java/impl/consumer/ConsumeTask.java | 20 +++++++++++++++
.../client/java/impl/producer/ProducerImpl.java | 30 ++++++++++++++++++----
.../client/java/rpc/LoggingInterceptor.java | 17 ++++++++++++
.../apache/rocketmq/client/java/tool/TestBase.java | 8 ++++--
5 files changed, 79 insertions(+), 11 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
index 7b29faa7..99926866 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/CompositedMessageInterceptor.java
@@ -39,11 +39,15 @@ public class CompositedMessageInterceptor implements
MessageInterceptor {
@Override
public void doBefore(MessageInterceptorContext context0,
List<GeneralMessage> messages) {
final HashMap<Integer, Map<AttributeKey, Attribute>> attributeMap =
new HashMap<>();
+ final MessageHookPoints messageHookPoints =
context0.getMessageHookPoints();
+ final MessageHookPointsStatus status = context0.getStatus();
+ MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(messageHookPoints, status);
+
for (int index = 0; index < interceptors.size(); index++) {
MessageInterceptor interceptor = interceptors.get(index);
- final MessageHookPoints messageHookPoints =
context0.getMessageHookPoints();
- final MessageHookPointsStatus status = context0.getStatus();
- final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(messageHookPoints, status);
+ if (context0 instanceof MessageInterceptorContextImpl) {
+ ((MessageInterceptorContextImpl)
context0).getAttributes().forEach(context::putAttribute);
+ }
try {
interceptor.doBefore(context, messages);
} catch (Throwable t) {
@@ -63,8 +67,11 @@ public class CompositedMessageInterceptor implements
MessageInterceptor {
final Map<AttributeKey, Attribute> attributes =
attributeMap.get(index);
final MessageHookPoints messageHookPoints =
context0.getMessageHookPoints();
final MessageHookPointsStatus status = context0.getStatus();
- final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(messageHookPoints, status,
+ MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(messageHookPoints, status,
attributes);
+ if (context0 instanceof MessageInterceptorContextImpl) {
+ ((MessageInterceptorContextImpl)
context0).getAttributes().forEach(context::putAttribute);
+ }
MessageInterceptor interceptor = interceptors.get(index);
try {
interceptor.doAfter(context, messages);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
index b1f5c7f2..19cea3e0 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.concurrent.Callable;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.java.hook.Attribute;
+import org.apache.rocketmq.client.java.hook.AttributeKey;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
@@ -30,10 +32,15 @@ import
org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.message.GeneralMessageImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ClientId;
+import org.apache.rocketmq.client.java.rpc.LoggingInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumeTask implements Callable<ConsumeResult> {
+ static final AttributeKey<MessageViewImpl> MESSAGE_VIEW_CONTEXT_KEY =
AttributeKey.create("messageView");
+ static final AttributeKey<String> REMOTE_ADDR_CONTEXT_KEY =
AttributeKey.create("remoteAddr");
+ static final AttributeKey<Throwable> CONSUME_ERROR_CONTEXT_KEY =
AttributeKey.create("consumeError");
+
private static final Logger log =
LoggerFactory.getLogger(ConsumeTask.class);
private final ClientId clientId;
@@ -59,7 +66,15 @@ public class ConsumeTask implements Callable<ConsumeResult> {
ConsumeResult consumeResult;
final List<GeneralMessage> generalMessages =
Collections.singletonList(new GeneralMessageImpl(messageView));
MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.CONSUME);
+
+ // Add remoteAddr to context.
+ String remoteAddr = LoggingInterceptor.getInstance().getRemoteAddr();
+ context.putAttribute(REMOTE_ADDR_CONTEXT_KEY,
Attribute.create(remoteAddr));
+ // Add message view to context.
+ context.putAttribute(MESSAGE_VIEW_CONTEXT_KEY,
Attribute.create(messageView));
+
messageInterceptor.doBefore(context, generalMessages);
+ Throwable throwable = null;
try {
consumeResult = messageListener.consume(messageView);
} catch (Throwable t) {
@@ -67,10 +82,15 @@ public class ConsumeTask implements Callable<ConsumeResult>
{
"messageId={}", clientId, messageView.getMessageQueue(),
messageView.getMessageId(), t);
// If exception was thrown during the period of message
consumption, mark it as failure.
consumeResult = ConsumeResult.FAILURE;
+ throwable = t;
}
MessageHookPointsStatus status =
ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK :
MessageHookPointsStatus.ERROR;
context = new MessageInterceptorContextImpl(context, status);
+ if (!ConsumeResult.SUCCESS.equals(consumeResult) && null != throwable)
{
+ // Add consume error to context.
+ context.putAttribute(CONSUME_ERROR_CONTEXT_KEY,
Attribute.create(throwable));
+ }
messageInterceptor.doAfter(context, generalMessages);
// Make sure that the return value is the subset of messageViews.
return consumeResult;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 1945639b..7c0ddad1 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -66,6 +66,8 @@ import
org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.exception.StatusChecker;
import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.hook.Attribute;
+import org.apache.rocketmq.client.java.hook.AttributeKey;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptorContextImpl;
@@ -81,6 +83,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteData;
+import org.apache.rocketmq.client.java.rpc.LoggingInterceptor;
import org.apache.rocketmq.client.java.rpc.RpcFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,6 +95,11 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
class ProducerImpl extends ClientImpl implements Producer {
+ static final AttributeKey<String> REMOTE_ADDR_CONTEXT_KEY =
AttributeKey.create("remoteAddr");
+ static final AttributeKey<List<SendReceiptImpl>> SEND_RECEIPTS_CONTEXT_KEY
= AttributeKey.create("sendReceipts");
+ static final AttributeKey<Throwable> SEND_EXCEPTION_CONTEXT_KEY =
AttributeKey.create("sendException");
+ static final AttributeKey<MessageType> MESSAGE_TYPE_CONTEXT_KEY =
AttributeKey.create("messageType");
+
private static final Logger log =
LoggerFactory.getLogger(ProducerImpl.class);
protected final PublishingSettings publishingSettings;
@@ -474,7 +482,14 @@ class ProducerImpl extends ClientImpl implements Producer {
// Intercept before message publishing.
final List<GeneralMessage> generalMessages =
messages.stream().map((Function<PublishingMessageImpl,
GeneralMessage>)
GeneralMessageImpl::new).collect(Collectors.toList());
- final MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.SEND);
+ MessageInterceptorContextImpl context = new
MessageInterceptorContextImpl(MessageHookPoints.SEND);
+
+ // Add message type to context.
+ context.putAttribute(MESSAGE_TYPE_CONTEXT_KEY,
Attribute.create(messageType));
+ // Add remoteAddr to context.
+ String remoteAddr = LoggingInterceptor.getInstance().getRemoteAddr();
+ context.putAttribute(REMOTE_ADDR_CONTEXT_KEY,
Attribute.create(remoteAddr));
+
doBefore(context, generalMessages);
Futures.addCallback(future, new
FutureCallback<List<SendReceiptImpl>>() {
@@ -488,15 +503,18 @@ class ProducerImpl extends ClientImpl implements Producer
{
future0.setException(e);
// Intercept after message publishing.
- final MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
+ MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
+ // Add send exception to context.
+ context0.putAttribute(SEND_EXCEPTION_CONTEXT_KEY,
Attribute.create(e));
doAfter(context0, generalMessages);
-
return;
}
// Intercept after message publishing.
- final MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
+ MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.OK);
+ // Add send receipts to context.
+ context0.putAttribute(SEND_RECEIPTS_CONTEXT_KEY,
Attribute.create(sendReceipts));
doAfter(context0, generalMessages);
// No need more attempts.
@@ -518,8 +536,10 @@ class ProducerImpl extends ClientImpl implements Producer {
@Override
public void onFailure(Throwable t) {
// Intercept after message publishing.
- final MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
+ MessageInterceptorContextImpl context0 = new
MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
+ // Add send exception to context.
+ context0.putAttribute(SEND_EXCEPTION_CONTEXT_KEY,
Attribute.create(t));
doAfter(context0, generalMessages);
// Collect messageId(s) for logging.
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
index 45261401..57a61252 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
@@ -17,12 +17,14 @@
package org.apache.rocketmq.client.java.rpc;
+import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
+import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.util.UUID;
@@ -36,6 +38,11 @@ public class LoggingInterceptor implements ClientInterceptor
{
private static final Logger log =
LoggerFactory.getLogger(LoggingInterceptor.class);
private static final LoggingInterceptor INSTANCE = new
LoggingInterceptor();
+ private String remoteAddr = "";
+
+ public String getRemoteAddr() {
+ return remoteAddr;
+ }
public static LoggingInterceptor getInstance() {
return INSTANCE;
@@ -70,6 +77,16 @@ public class LoggingInterceptor implements ClientInterceptor
{
+ "authority={}, headers={}", rpcId,
serviceName, methodName, authority, headers);
super.onHeaders(headers);
}
+
+ @Override
+ public void onReady() {
+ Attributes attributes = getAttributes();
+ Object address =
attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
+ String remoteAddrStr = address != null ?
address.toString() : "";
+ remoteAddr = remoteAddrStr.startsWith("/") ?
remoteAddrStr.substring(1) : remoteAddrStr;
+ super.onReady();
+ }
+
};
super.start(observabilityListener, headers);
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 8b74d04b..7f80f155 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -232,8 +232,12 @@ public class TestBase {
}
protected MessageQueue fakePbMessageQueue0(Resource topicResource) {
- return
MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker0())
- .setPermission(Permission.READ_WRITE).build();
+ return MessageQueue.newBuilder()
+ .setTopic(topicResource)
+ .setBroker(fakePbBroker0())
+ .setPermission(Permission.READ_WRITE)
+ .addAcceptMessageTypes(MessageType.NORMAL)
+ .build();
}
protected MessageQueue fakePbMessageQueue1(Resource topicResource) {