This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new bbbe737e4e [ISSUE #6964] use the correct context in telemetry; polish
the code structure (#6965)
bbbe737e4e is described below
commit bbbe737e4e57ebc32581220fa8766cf32f7833eb
Author: lk <[email protected]>
AuthorDate: Thu Jun 29 15:27:30 2023 +0800
[ISSUE #6964] use the correct context in telemetry; polish the code
structure (#6965)
---
.../proxy/grpc/v2/ContextStreamObserver.java | 29 +++++++++++
.../proxy/grpc/v2/DefaultGrpcMessingActivity.java | 5 +-
.../proxy/grpc/v2/GrpcMessagingApplication.java | 6 +--
.../proxy/grpc/v2/GrpcMessingActivity.java | 2 +-
.../proxy/grpc/v2/client/ClientActivity.java | 18 ++++---
.../grpc/v2/common/GrpcClientSettingsManager.java | 22 ++++----
.../rocketmq/proxy/processor/ClientProcessor.java | 2 +-
.../proxy/processor/DefaultMessagingProcessor.java | 4 +-
.../proxy/processor/MessagingProcessor.java | 2 +-
.../remoting/activity/ClientManagerActivity.java | 12 ++---
.../remoting/activity/ConsumerManagerActivity.java | 4 +-
.../remoting/activity/PullMessageActivity.java | 2 +-
.../remoting/channel/RemotingChannelManager.java | 9 ++--
.../proxy/service/route/TopicRouteService.java | 60 ++++------------------
.../proxy/grpc/v2/client/ClientActivityTest.java | 16 +++---
.../v2/common/GrpcClientSettingsManagerTest.java | 8 +--
.../remoting/activity/PullMessageActivityTest.java | 4 +-
.../channel/RemotingChannelManagerTest.java | 30 ++++++-----
.../protocol/body/LockBatchRequestBody.java | 11 ++++
.../protocol/body/UnlockBatchRequestBody.java | 11 ++++
.../protocol/header/NotificationRequestHeader.java | 14 +++++
.../header/QueryConsumerOffsetRequestHeader.java | 11 ++++
22 files changed, 160 insertions(+), 122 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/ContextStreamObserver.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/ContextStreamObserver.java
new file mode 100644
index 0000000000..c186bfb61c
--- /dev/null
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/ContextStreamObserver.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.grpc.v2;
+
+import org.apache.rocketmq.proxy.common.ProxyContext;
+
+public interface ContextStreamObserver<V> {
+
+ void onNext(ProxyContext ctx, V value);
+
+ void onError(Throwable t);
+
+ void onCompleted();
+}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
index 9d49e0e2ca..73b764bc4f 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
@@ -150,8 +150,7 @@ public class DefaultGrpcMessingActivity extends
AbstractStartAndShutdown impleme
}
@Override
- public StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx,
- StreamObserver<TelemetryCommand> responseObserver) {
- return this.clientActivity.telemetry(ctx, responseObserver);
+ public ContextStreamObserver<TelemetryCommand>
telemetry(StreamObserver<TelemetryCommand> responseObserver) {
+ return this.clientActivity.telemetry(responseObserver);
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
index 32395322a3..2cb395ad60 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
@@ -378,17 +378,17 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
@Override
public StreamObserver<TelemetryCommand>
telemetry(StreamObserver<TelemetryCommand> responseObserver) {
Function<Status, TelemetryCommand> statusResponseCreator = status ->
TelemetryCommand.newBuilder().setStatus(status).build();
- ProxyContext context = createContext();
- StreamObserver<TelemetryCommand> responseTelemetryCommand =
grpcMessingActivity.telemetry(context, responseObserver);
+ ContextStreamObserver<TelemetryCommand> responseTelemetryCommand =
grpcMessingActivity.telemetry(responseObserver);
return new StreamObserver<TelemetryCommand>() {
@Override
public void onNext(TelemetryCommand value) {
+ ProxyContext context = createContext();
try {
validateContext(context);
addExecutor(clientManagerThreadPoolExecutor,
context,
value,
- () -> responseTelemetryCommand.onNext(value),
+ () -> responseTelemetryCommand.onNext(context, value),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
index 8f1db82307..77bd3a88f9 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
@@ -69,5 +69,5 @@ public interface GrpcMessingActivity extends StartAndShutdown
{
CompletableFuture<ChangeInvisibleDurationResponse>
changeInvisibleDuration(ProxyContext ctx,
ChangeInvisibleDurationRequest request);
- StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx,
StreamObserver<TelemetryCommand> responseObserver);
+ ContextStreamObserver<TelemetryCommand>
telemetry(StreamObserver<TelemetryCommand> responseObserver);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index a60228eb9f..8553289498 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.ContextStreamObserver;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
@@ -174,11 +175,10 @@ public class ClientActivity extends
AbstractMessingActivity {
return future;
}
- public StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx,
- StreamObserver<TelemetryCommand> responseObserver) {
- return new StreamObserver<TelemetryCommand>() {
+ public ContextStreamObserver<TelemetryCommand>
telemetry(StreamObserver<TelemetryCommand> responseObserver) {
+ return new ContextStreamObserver<TelemetryCommand>() {
@Override
- public void onNext(TelemetryCommand request) {
+ public void onNext(ProxyContext ctx, TelemetryCommand request) {
try {
switch (request.getCommandCase()) {
case SETTINGS: {
@@ -271,7 +271,7 @@ public class ClientActivity extends AbstractMessingActivity
{
protected TelemetryCommand processClientSettings(ProxyContext ctx,
TelemetryCommand request) {
String clientId = ctx.getClientID();
- grpcClientSettingsManager.updateClientSettings(clientId,
request.getSettings());
+ grpcClientSettingsManager.updateClientSettings(ctx, clientId,
request.getSettings());
Settings settings = grpcClientSettingsManager.getClientSettings(ctx);
return TelemetryCommand.newBuilder()
.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK,
Code.OK.name()))
@@ -458,7 +458,11 @@ public class ClientActivity extends
AbstractMessingActivity {
if (settings == null) {
return;
}
-
grpcClientSettingsManager.updateClientSettings(clientChannelInfo.getClientId(),
settings);
+ grpcClientSettingsManager.updateClientSettings(
+ ProxyContext.createForInner(this.getClass()),
+ clientChannelInfo.getClientId(),
+ settings
+ );
}
}
}
@@ -475,7 +479,7 @@ public class ClientActivity extends AbstractMessingActivity
{
public void handle(ProducerGroupEvent event, String group,
ClientChannelInfo clientChannelInfo) {
if (event == ProducerGroupEvent.CLIENT_UNREGISTER) {
grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
-
grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
+
grpcClientSettingsManager.removeAndGetRawClientSettings(clientChannelInfo.getClientId());
}
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
index af8b4546e1..1eff659392 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
@@ -33,15 +33,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.MetricCollectorMode;
import org.apache.rocketmq.proxy.config.ProxyConfig;
@@ -68,7 +67,7 @@ public class GrpcClientSettingsManager extends ServiceThread
implements StartAnd
public Settings getClientSettings(ProxyContext ctx) {
String clientId = ctx.getClientID();
- Settings settings = CLIENT_SETTINGS_MAP.get(clientId);
+ Settings settings = getRawClientSettings(clientId);
if (settings == null) {
return null;
}
@@ -182,7 +181,7 @@ public class GrpcClientSettingsManager extends
ServiceThread implements StartAnd
.build();
}
- public void updateClientSettings(String clientId, Settings settings) {
+ public void updateClientSettings(ProxyContext ctx, String clientId,
Settings settings) {
if (settings.hasSubscription()) {
settings =
createDefaultConsumerSettingsBuilder().mergeFrom(settings).build();
}
@@ -194,17 +193,13 @@ public class GrpcClientSettingsManager extends
ServiceThread implements StartAnd
.toBuilder();
}
- public void removeClientSettings(String clientId) {
- CLIENT_SETTINGS_MAP.remove(clientId);
- }
-
- public void computeIfPresent(String clientId, Function<Settings, Settings>
function) {
- CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, value) ->
function.apply(value));
+ public Settings removeAndGetRawClientSettings(String clientId) {
+ return CLIENT_SETTINGS_MAP.remove(clientId);
}
public Settings removeAndGetClientSettings(ProxyContext ctx) {
String clientId = ctx.getClientID();
- Settings settings = CLIENT_SETTINGS_MAP.remove(clientId);
+ Settings settings = this.removeAndGetRawClientSettings(clientId);
if (settings == null) {
return null;
}
@@ -237,7 +232,10 @@ public class GrpcClientSettingsManager extends
ServiceThread implements StartAnd
return settings;
}
String consumerGroup =
GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup());
- ConsumerGroupInfo consumerGroupInfo =
this.messagingProcessor.getConsumerGroupInfo(consumerGroup);
+ ConsumerGroupInfo consumerGroupInfo =
this.messagingProcessor.getConsumerGroupInfo(
+ ProxyContext.createForInner(this.getClass()),
+ consumerGroup
+ );
if (consumerGroupInfo == null ||
consumerGroupInfo.findChannel(clientId) == null) {
log.info("remove unused grpc client settings.
group:{}, settings:{}", consumerGroupInfo, settings);
return null;
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
index 8fb6eaf7df..eeb9bf87e6 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
@@ -110,7 +110,7 @@ public class ClientProcessor extends AbstractProcessor {
this.serviceManager.getConsumerManager().appendConsumerIdsChangeListener(listener);
}
- public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) {
+ public ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String
consumerGroup) {
return
this.serviceManager.getConsumerManager().getConsumerGroupInfo(consumerGroup);
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 72ff9b939d..e663ae1ba2 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -290,8 +290,8 @@ public class DefaultMessagingProcessor extends
AbstractStartAndShutdown implemen
}
@Override
- public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) {
- return this.clientProcessor.getConsumerGroupInfo(consumerGroup);
+ public ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String
consumerGroup) {
+ return this.clientProcessor.getConsumerGroupInfo(ctx, consumerGroup);
}
@Override
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index 40ffb96a7a..263068965a 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -288,7 +288,7 @@ public interface MessagingProcessor extends
StartAndShutdown {
void doChannelCloseEvent(String remoteAddr, Channel channel);
- ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup);
+ ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String
consumerGroup);
void addTransactionSubscription(
ProxyContext ctx,
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
index 69280fb864..1eb81ce927 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
@@ -80,7 +80,7 @@ public class ClientManagerActivity extends
AbstractRemotingActivity {
for (ProducerData data : heartbeatData.getProducerDataSet()) {
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
-
this.remotingChannelManager.createProducerChannel(ctx.channel(),
data.getGroupName(), clientId),
+ this.remotingChannelManager.createProducerChannel(context,
ctx.channel(), data.getGroupName(), clientId),
clientId, request.getLanguage(),
request.getVersion());
setClientPropertiesToChannelAttr(clientChannelInfo);
@@ -89,7 +89,7 @@ public class ClientManagerActivity extends
AbstractRemotingActivity {
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
-
this.remotingChannelManager.createConsumerChannel(ctx.channel(),
data.getGroupName(), clientId, data.getSubscriptionDataSet()),
+ this.remotingChannelManager.createConsumerChannel(context,
ctx.channel(), data.getGroupName(), clientId, data.getSubscriptionDataSet()),
clientId, request.getLanguage(),
request.getVersion());
setClientPropertiesToChannelAttr(clientChannelInfo);
@@ -122,7 +122,7 @@ public class ClientManagerActivity extends
AbstractRemotingActivity {
(UnregisterClientRequestHeader)
request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
final String producerGroup = requestHeader.getProducerGroup();
if (producerGroup != null) {
- RemotingChannel channel =
this.remotingChannelManager.removeProducerChannel(producerGroup, ctx.channel());
+ RemotingChannel channel =
this.remotingChannelManager.removeProducerChannel(context, producerGroup,
ctx.channel());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
channel,
requestHeader.getClientID(),
@@ -132,7 +132,7 @@ public class ClientManagerActivity extends
AbstractRemotingActivity {
}
final String consumerGroup = requestHeader.getConsumerGroup();
if (consumerGroup != null) {
- RemotingChannel channel =
this.remotingChannelManager.removeConsumerChannel(consumerGroup, ctx.channel());
+ RemotingChannel channel =
this.remotingChannelManager.removeConsumerChannel(context, consumerGroup,
ctx.channel());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
channel,
requestHeader.getClientID(),
@@ -170,7 +170,7 @@ public class ClientManagerActivity extends
AbstractRemotingActivity {
}
if (args[0] instanceof ClientChannelInfo) {
ClientChannelInfo clientChannelInfo = (ClientChannelInfo)
args[0];
- remotingChannelManager.removeConsumerChannel(group,
clientChannelInfo.getChannel());
+
remotingChannelManager.removeConsumerChannel(ProxyContext.createForInner(this.getClass()),
group, clientChannelInfo.getChannel());
log.info("remove remoting channel when client unregister.
clientChannelInfo:{}", clientChannelInfo);
}
}
@@ -187,7 +187,7 @@ public class ClientManagerActivity extends
AbstractRemotingActivity {
@Override
public void handle(ProducerGroupEvent event, String group,
ClientChannelInfo clientChannelInfo) {
if (event == ProducerGroupEvent.CLIENT_UNREGISTER) {
- remotingChannelManager.removeProducerChannel(group,
clientChannelInfo.getChannel());
+
remotingChannelManager.removeProducerChannel(ProxyContext.createForInner(this.getClass()),
group, clientChannelInfo.getChannel());
}
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
index e9d42afc2c..b21b4afa42 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
@@ -83,7 +83,7 @@ public class ConsumerManagerActivity extends
AbstractRemotingActivity {
ProxyContext context) throws Exception {
RemotingCommand response =
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
GetConsumerListByGroupRequestHeader header =
(GetConsumerListByGroupRequestHeader)
request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
- ConsumerGroupInfo consumerGroupInfo =
messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup());
+ ConsumerGroupInfo consumerGroupInfo =
messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup());
List<String> clientIds = consumerGroupInfo.getAllClientId();
GetConsumerListByGroupResponseBody body = new
GetConsumerListByGroupResponseBody();
body.setConsumerIdList(clientIds);
@@ -96,7 +96,7 @@ public class ConsumerManagerActivity extends
AbstractRemotingActivity {
ProxyContext context) throws Exception {
RemotingCommand response =
RemotingCommand.createResponseCommand(GetConsumerConnectionListRequestHeader.class);
GetConsumerConnectionListRequestHeader header =
(GetConsumerConnectionListRequestHeader)
request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
- ConsumerGroupInfo consumerGroupInfo =
messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup());
+ ConsumerGroupInfo consumerGroupInfo =
messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup());
if (consumerGroupInfo != null) {
ConsumerConnection bodydata = new ConsumerConnection();
bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere());
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
index d548ddc0df..3324c231ab 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
@@ -41,7 +41,7 @@ public class PullMessageActivity extends
AbstractRemotingActivity {
PullMessageRequestHeader requestHeader = (PullMessageRequestHeader)
request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
int sysFlag = requestHeader.getSysFlag();
if (!PullSysFlag.hasSubscriptionFlag(sysFlag)) {
- ConsumerGroupInfo consumerInfo =
messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup());
+ ConsumerGroupInfo consumerInfo =
messagingProcessor.getConsumerGroupInfo(context,
requestHeader.getConsumerGroup());
if (consumerInfo == null) {
return
RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST,
"the consumer's subscription not latest");
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
index 133865f48b..211c3c9275 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -57,11 +58,11 @@ public class RemotingChannelManager implements
StartAndShutdown {
return prefix + group;
}
- public RemotingChannel createProducerChannel(Channel channel, String
group, String clientId) {
+ public RemotingChannel createProducerChannel(ProxyContext ctx, Channel
channel, String group, String clientId) {
return createChannel(channel, buildProducerKey(group), clientId,
Collections.emptySet());
}
- public RemotingChannel createConsumerChannel(Channel channel, String
group, String clientId, Set<SubscriptionData> subscriptionData) {
+ public RemotingChannel createConsumerChannel(ProxyContext ctx, Channel
channel, String group, String clientId, Set<SubscriptionData> subscriptionData)
{
return createChannel(channel, buildConsumerKey(group), clientId,
subscriptionData);
}
@@ -96,11 +97,11 @@ public class RemotingChannelManager implements
StartAndShutdown {
return removedChannelSet;
}
- public RemotingChannel removeProducerChannel(String group, Channel
channel) {
+ public RemotingChannel removeProducerChannel(ProxyContext ctx, String
group, Channel channel) {
return removeChannel(buildProducerKey(group), channel);
}
- public RemotingChannel removeConsumerChannel(String group, Channel
channel) {
+ public RemotingChannel removeConsumerChannel(ProxyContext ctx, String
group, Channel channel) {
return removeChannel(buildConsumerKey(group), channel);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index 3fa6414c39..b6b14faa49 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -26,19 +26,18 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.common.AbstractCacheLoader;
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -52,8 +51,6 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
protected final LoadingCache<String /* topicName */, MessageQueueView>
topicCache;
protected final ScheduledExecutorService scheduledExecutorService;
protected final ThreadPoolExecutor cacheRefreshExecutor;
- private final TopicRouteCacheLoader topicRouteCacheLoader = new
TopicRouteCacheLoader();
-
public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) {
ProxyConfig config = ConfigurationManager.getProxyConfig();
@@ -76,13 +73,8 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
executor(cacheRefreshExecutor).build(new CacheLoader<String,
MessageQueueView>() {
@Override public @Nullable MessageQueueView load(String topic)
throws Exception {
try {
- TopicRouteData topicRouteData =
topicRouteCacheLoader.loadTopicRouteData(topic);
- if (isTopicRouteValid(topicRouteData)) {
- MessageQueueView tmp = new MessageQueueView(topic,
topicRouteData);
- log.info("load topic route from namesrv. topic:
{}, queue: {}", topic, tmp);
- return tmp;
- }
- return MessageQueueView.WRAPPED_EMPTY_QUEUE;
+ TopicRouteData topicRouteData =
mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic,
Duration.ofSeconds(3).toMillis());
+ return buildMessageQueueView(topic, topicRouteData);
} catch (Exception e) {
if (TopicRouteHelper.isTopicNotExistError(e)) {
return MessageQueueView.WRAPPED_EMPTY_QUEUE;
@@ -138,44 +130,12 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
&& routeData.getBrokerDatas() != null &&
!routeData.getBrokerDatas().isEmpty();
}
- protected abstract class AbstractTopicRouteCacheLoader extends
AbstractCacheLoader<String, MessageQueueView> {
-
- public AbstractTopicRouteCacheLoader() {
- super(cacheRefreshExecutor);
- }
-
- protected abstract TopicRouteData loadTopicRouteData(String topic)
throws Exception;
-
- @Override
- public MessageQueueView getDirectly(String topic) throws Exception {
- try {
- TopicRouteData topicRouteData = loadTopicRouteData(topic);
-
- if (isTopicRouteValid(topicRouteData)) {
- MessageQueueView tmp = new MessageQueueView(topic,
topicRouteData);
- log.info("load topic route from namesrv. topic: {}, queue:
{}", topic, tmp);
- return tmp;
- }
- return MessageQueueView.WRAPPED_EMPTY_QUEUE;
- } catch (Exception e) {
- if (TopicRouteHelper.isTopicNotExistError(e)) {
- return MessageQueueView.WRAPPED_EMPTY_QUEUE;
- }
- throw e;
- }
- }
-
- @Override
- protected void onErr(String key, Exception e) {
- log.error("load topic route from namesrv failed. topic:{}", key,
e);
- }
- }
-
- protected class TopicRouteCacheLoader extends
AbstractTopicRouteCacheLoader {
-
- @Override
- protected TopicRouteData loadTopicRouteData(String topic) throws
Exception {
- return
mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic,
Duration.ofSeconds(3).toMillis());
+ protected MessageQueueView buildMessageQueueView(String topic,
TopicRouteData topicRouteData) {
+ if (isTopicRouteValid(topicRouteData)) {
+ MessageQueueView tmp = new MessageQueueView(topic, topicRouteData);
+ log.info("load topic route from namesrv. topic: {}, queue: {}",
topic, tmp);
+ return tmp;
}
+ return MessageQueueView.WRAPPED_EMPTY_QUEUE;
}
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
index a5d4e3c919..0c1ebcdfae 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
@@ -43,6 +43,7 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
+import org.apache.rocketmq.proxy.grpc.v2.ContextStreamObserver;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
@@ -341,7 +342,7 @@ public class ClientActivityTest extends BaseActivityTest {
String nonce = "123";
when(grpcChannelManagerMock.getAndRemoveResponseFuture(anyString())).thenReturn((CompletableFuture)
runningInfoFutureMock);
ProxyContext context = createContext();
- StreamObserver<TelemetryCommand> streamObserver =
clientActivity.telemetry(context, new StreamObserver<TelemetryCommand>() {
+ ContextStreamObserver<TelemetryCommand> streamObserver =
clientActivity.telemetry(new StreamObserver<TelemetryCommand>() {
@Override
public void onNext(TelemetryCommand value) {
}
@@ -354,7 +355,7 @@ public class ClientActivityTest extends BaseActivityTest {
public void onCompleted() {
}
});
- streamObserver.onNext(TelemetryCommand.newBuilder()
+ streamObserver.onNext(context, TelemetryCommand.newBuilder()
.setThreadStackTrace(ThreadStackTrace.newBuilder()
.setThreadStackTrace(jstack)
.setNonce(nonce)
@@ -373,7 +374,7 @@ public class ClientActivityTest extends BaseActivityTest {
String nonce = "123";
when(grpcChannelManagerMock.getAndRemoveResponseFuture(anyString())).thenReturn((CompletableFuture)
resultFutureMock);
ProxyContext context = createContext();
- StreamObserver<TelemetryCommand> streamObserver =
clientActivity.telemetry(context, new StreamObserver<TelemetryCommand>() {
+ ContextStreamObserver<TelemetryCommand> streamObserver =
clientActivity.telemetry(new StreamObserver<TelemetryCommand>() {
@Override
public void onNext(TelemetryCommand value) {
}
@@ -386,7 +387,7 @@ public class ClientActivityTest extends BaseActivityTest {
public void onCompleted() {
}
});
- streamObserver.onNext(TelemetryCommand.newBuilder()
+ streamObserver.onNext(context, TelemetryCommand.newBuilder()
.setVerifyMessageResult(VerifyMessageResult.newBuilder()
.setNonce(nonce)
.build())
@@ -418,11 +419,8 @@ public class ClientActivityTest extends BaseActivityTest {
}
};
- StreamObserver<TelemetryCommand> requestObserver =
this.clientActivity.telemetry(
- ctx,
- responseObserver
- );
- requestObserver.onNext(TelemetryCommand.newBuilder()
+ ContextStreamObserver<TelemetryCommand> requestObserver =
this.clientActivity.telemetry(responseObserver);
+ requestObserver.onNext(ctx, TelemetryCommand.newBuilder()
.setSettings(settings)
.build());
return future;
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
index 9044873a6d..6742f094c8 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
@@ -54,7 +54,7 @@ public class GrpcClientSettingsManagerTest extends
BaseActivityTest {
public void testGetProducerData() {
ProxyContext context =
ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID);
- this.grpcClientSettingsManager.updateClientSettings(CLIENT_ID,
Settings.newBuilder()
+ this.grpcClientSettingsManager.updateClientSettings(context,
CLIENT_ID, Settings.newBuilder()
.setBackoffPolicy(RetryPolicy.getDefaultInstance())
.setPublishing(Publishing.getDefaultInstance())
.build());
@@ -65,18 +65,18 @@ public class GrpcClientSettingsManagerTest extends
BaseActivityTest {
@Test
public void testGetSubscriptionData() {
+ ProxyContext context =
ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID);
+
SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
when(this.messagingProcessor.getSubscriptionGroupConfig(any(), any()))
.thenReturn(subscriptionGroupConfig);
- this.grpcClientSettingsManager.updateClientSettings(CLIENT_ID,
Settings.newBuilder()
+ this.grpcClientSettingsManager.updateClientSettings(context,
CLIENT_ID, Settings.newBuilder()
.setSubscription(Subscription.newBuilder()
.setGroup(Resource.newBuilder().setName("group").build())
.build())
.build());
- ProxyContext context =
ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID);
-
Settings settings =
this.grpcClientSettingsManager.getClientSettings(context);
assertEquals(settings.getBackoffPolicy(),
this.grpcClientSettingsManager.createDefaultConsumerSettingsBuilder().build().getBackoffPolicy());
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
index d8ad451875..a2f1f4cc89 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
@@ -77,7 +77,7 @@ public class PullMessageActivityTest extends InitConfigTest {
@Test
public void testPullMessageWithoutSub() throws Exception {
- when(messagingProcessorMock.getConsumerGroupInfo(eq(group)))
+ when(messagingProcessorMock.getConsumerGroupInfo(any(), eq(group)))
.thenReturn(consumerGroupInfoMock);
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setSubString(subString);
@@ -128,7 +128,7 @@ public class PullMessageActivityTest extends InitConfigTest
{
@Test
public void testPullMessageWithSub() throws Exception {
- when(messagingProcessorMock.getConsumerGroupInfo(eq(group)))
+ when(messagingProcessorMock.getConsumerGroupInfo(any(), eq(group)))
.thenReturn(consumerGroupInfoMock);
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setSubString(subString);
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
index 5a5b441e95..1122405937 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
@@ -21,6 +21,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import java.util.HashSet;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
@@ -46,6 +47,7 @@ public class RemotingChannelManagerTest {
private final String remoteAddress = "10.152.39.53:9768";
private final String localAddress = "11.193.0.1:1210";
private RemotingChannelManager remotingChannelManager;
+ private final ProxyContext ctx =
ProxyContext.createForInner(this.getClass());
@Before
public void before() {
@@ -58,13 +60,13 @@ public class RemotingChannelManagerTest {
String clientId = RandomStringUtils.randomAlphabetic(10);
Channel producerChannel = createMockChannel();
- RemotingChannel producerRemotingChannel =
this.remotingChannelManager.createProducerChannel(producerChannel, group,
clientId);
+ RemotingChannel producerRemotingChannel =
this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group,
clientId);
assertNotNull(producerRemotingChannel);
- assertSame(producerRemotingChannel,
this.remotingChannelManager.createProducerChannel(producerChannel, group,
clientId));
+ assertSame(producerRemotingChannel,
this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group,
clientId));
Channel consumerChannel = createMockChannel();
- RemotingChannel consumerRemotingChannel =
this.remotingChannelManager.createConsumerChannel(consumerChannel, group,
clientId, new HashSet<>());
- assertSame(consumerRemotingChannel,
this.remotingChannelManager.createConsumerChannel(consumerChannel, group,
clientId, new HashSet<>()));
+ RemotingChannel consumerRemotingChannel =
this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group,
clientId, new HashSet<>());
+ assertSame(consumerRemotingChannel,
this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group,
clientId, new HashSet<>()));
assertNotNull(consumerRemotingChannel);
assertNotSame(producerRemotingChannel, consumerRemotingChannel);
@@ -77,14 +79,14 @@ public class RemotingChannelManagerTest {
{
Channel producerChannel = createMockChannel();
- RemotingChannel producerRemotingChannel =
this.remotingChannelManager.createProducerChannel(producerChannel, group,
clientId);
- assertSame(producerRemotingChannel,
this.remotingChannelManager.removeProducerChannel(group,
producerRemotingChannel));
+ RemotingChannel producerRemotingChannel =
this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group,
clientId);
+ assertSame(producerRemotingChannel,
this.remotingChannelManager.removeProducerChannel(ctx, group,
producerRemotingChannel));
assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
}
{
Channel producerChannel = createMockChannel();
- RemotingChannel producerRemotingChannel =
this.remotingChannelManager.createProducerChannel(producerChannel, group,
clientId);
- assertSame(producerRemotingChannel,
this.remotingChannelManager.removeProducerChannel(group, producerChannel));
+ RemotingChannel producerRemotingChannel =
this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group,
clientId);
+ assertSame(producerRemotingChannel,
this.remotingChannelManager.removeProducerChannel(ctx, group, producerChannel));
assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
}
}
@@ -96,14 +98,14 @@ public class RemotingChannelManagerTest {
{
Channel consumerChannel = createMockChannel();
- RemotingChannel consumerRemotingChannel =
this.remotingChannelManager.createConsumerChannel(consumerChannel, group,
clientId, new HashSet<>());
- assertSame(consumerRemotingChannel,
this.remotingChannelManager.removeConsumerChannel(group,
consumerRemotingChannel));
+ RemotingChannel consumerRemotingChannel =
this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group,
clientId, new HashSet<>());
+ assertSame(consumerRemotingChannel,
this.remotingChannelManager.removeConsumerChannel(ctx, group,
consumerRemotingChannel));
assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
}
{
Channel consumerChannel = createMockChannel();
- RemotingChannel consumerRemotingChannel =
this.remotingChannelManager.createConsumerChannel(consumerChannel, group,
clientId, new HashSet<>());
- assertSame(consumerRemotingChannel,
this.remotingChannelManager.removeConsumerChannel(group, consumerChannel));
+ RemotingChannel consumerRemotingChannel =
this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group,
clientId, new HashSet<>());
+ assertSame(consumerRemotingChannel,
this.remotingChannelManager.removeConsumerChannel(ctx, group, consumerChannel));
assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
}
}
@@ -115,9 +117,9 @@ public class RemotingChannelManagerTest {
String clientId = RandomStringUtils.randomAlphabetic(10);
Channel consumerChannel = createMockChannel();
- RemotingChannel consumerRemotingChannel =
this.remotingChannelManager.createConsumerChannel(consumerChannel,
consumerGroup, clientId, new HashSet<>());
+ RemotingChannel consumerRemotingChannel =
this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel,
consumerGroup, clientId, new HashSet<>());
Channel producerChannel = createMockChannel();
- RemotingChannel producerRemotingChannel =
this.remotingChannelManager.createProducerChannel(producerChannel,
producerGroup, clientId);
+ RemotingChannel producerRemotingChannel =
this.remotingChannelManager.createProducerChannel(ctx, producerChannel,
producerGroup, clientId);
assertSame(consumerRemotingChannel,
this.remotingChannelManager.removeChannel(consumerChannel).stream().findFirst().get());
assertSame(producerRemotingChannel,
this.remotingChannelManager.removeChannel(producerChannel).stream().findFirst().get());
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java
index 02912446cf..6766564bc7 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting.protocol.body;
+import com.google.common.base.MoreObjects;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -59,4 +60,14 @@ public class LockBatchRequestBody extends
RemotingSerializable {
public void setMqSet(Set<MessageQueue> mqSet) {
this.mqSet = mqSet;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("consumerGroup", consumerGroup)
+ .add("clientId", clientId)
+ .add("onlyThisBroker", onlyThisBroker)
+ .add("mqSet", mqSet)
+ .toString();
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java
index fcac7ed9ae..2ad906739c 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting.protocol.body;
+import com.google.common.base.MoreObjects;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -59,4 +60,14 @@ public class UnlockBatchRequestBody extends
RemotingSerializable {
public void setMqSet(Set<MessageQueue> mqSet) {
this.mqSet = mqSet;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("consumerGroup", consumerGroup)
+ .add("clientId", clientId)
+ .add("onlyThisBroker", onlyThisBroker)
+ .add("mqSet", mqSet)
+ .toString();
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
index 5965e9dcbb..2ccf564df5 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.remoting.protocol.header;
+import com.google.common.base.MoreObjects;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
@@ -99,4 +100,17 @@ public class NotificationRequestHeader extends
TopicQueueRequestHeader {
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("consumerGroup", consumerGroup)
+ .add("topic", topic)
+ .add("queueId", queueId)
+ .add("pollTime", pollTime)
+ .add("bornTime", bornTime)
+ .add("order", order)
+ .add("attemptId", attemptId)
+ .toString();
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java
index 39aaa01176..e16d38a7a3 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -20,6 +20,7 @@
*/
package org.apache.rocketmq.remoting.protocol.header;
+import com.google.common.base.MoreObjects;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
@@ -73,4 +74,14 @@ public class QueryConsumerOffsetRequestHeader extends
TopicQueueRequestHeader {
public void setSetZeroIfNotFound(Boolean setZeroIfNotFound) {
this.setZeroIfNotFound = setZeroIfNotFound;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("consumerGroup", consumerGroup)
+ .add("topic", topic)
+ .add("queueId", queueId)
+ .add("setZeroIfNotFound", setZeroIfNotFound)
+ .toString();
+ }
}