This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new aea5811df [ISSUE #6884] Resolve proxy sending mentality to broker and 
unable to find ACL configuration related (#6885)
aea5811df is described below

commit aea5811df007c2abf2d46eea931e4c867514e0eb
Author: 城南少年与猫 <[email protected]>
AuthorDate: Thu Jun 15 10:00:45 2023 +0800

    [ISSUE #6884] Resolve proxy sending mentality to broker and unable to find 
ACL configuration related (#6885)
    
    Co-authored-by: fengbaichao <[email protected]>
---
 .../org/apache/rocketmq/proxy/service/ClusterServiceManager.java  | 2 +-
 .../rocketmq/proxy/service/client/ClusterConsumerManager.java     | 5 +++--
 .../proxy/service/sysmessage/AbstractSystemMessageSyncer.java     | 8 +++++---
 .../apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java | 5 +++--
 .../rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java    | 4 ++--
 5 files changed, 14 insertions(+), 10 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index 95cc4d149..d2ddfc352 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -97,7 +97,7 @@ public class ClusterServiceManager extends 
AbstractStartAndShutdown implements S
         this.adminService = new 
DefaultAdminService(this.operationClientAPIFactory);
 
         this.producerManager = new ProducerManager();
-        this.consumerManager = new 
ClusterConsumerManager(this.topicRouteService, this.adminService, 
this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), 
proxyConfig.getChannelExpiredTimeout());
+        this.consumerManager = new 
ClusterConsumerManager(this.topicRouteService, this.adminService, 
this.operationClientAPIFactory, new ConsumerIdsChangeListenerImpl(), 
proxyConfig.getChannelExpiredTimeout(), rpcHook);
 
         this.transactionClientAPIFactory = new MQClientAPIFactory(
             nameserverAccessConfig,
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
index 07aeb23fc..65a4569f8 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.proxy.service.admin.AdminService;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.proxy.service.route.TopicRouteService;
 import org.apache.rocketmq.proxy.service.sysmessage.HeartbeatSyncer;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -36,9 +37,9 @@ public class ClusterConsumerManager extends ConsumerManager 
implements StartAndS
     protected HeartbeatSyncer heartbeatSyncer;
 
     public ClusterConsumerManager(TopicRouteService topicRouteService, 
AdminService adminService,
-        MQClientAPIFactory mqClientAPIFactory, ConsumerIdsChangeListener 
consumerIdsChangeListener, long channelExpiredTimeout) {
+                                  MQClientAPIFactory mqClientAPIFactory, 
ConsumerIdsChangeListener consumerIdsChangeListener, long 
channelExpiredTimeout, RPCHook rpcHook) {
         super(consumerIdsChangeListener, channelExpiredTimeout);
-        this.heartbeatSyncer = new HeartbeatSyncer(topicRouteService, 
adminService, this, mqClientAPIFactory);
+        this.heartbeatSyncer = new HeartbeatSyncer(topicRouteService, 
adminService, this, mqClientAPIFactory, rpcHook);
     }
 
     @Override
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
index 2ef849737..fcdc25cac 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
@@ -51,12 +51,14 @@ public abstract class AbstractSystemMessageSyncer 
implements StartAndShutdown, M
     protected final TopicRouteService topicRouteService;
     protected final AdminService adminService;
     protected final MQClientAPIFactory mqClientAPIFactory;
+    protected final RPCHook rpcHook;
     protected DefaultMQPushConsumer defaultMQPushConsumer;
 
-    public AbstractSystemMessageSyncer(TopicRouteService topicRouteService, 
AdminService adminService, MQClientAPIFactory mqClientAPIFactory) {
+    public AbstractSystemMessageSyncer(TopicRouteService topicRouteService, 
AdminService adminService, MQClientAPIFactory mqClientAPIFactory, RPCHook 
rpcHook) {
         this.topicRouteService = topicRouteService;
         this.adminService = adminService;
         this.mqClientAPIFactory = mqClientAPIFactory;
+        this.rpcHook = rpcHook;
     }
 
     protected String getSystemMessageProducerId() {
@@ -84,8 +86,8 @@ public abstract class AbstractSystemMessageSyncer implements 
StartAndShutdown, M
         return 1;
     }
 
-    protected RPCHook getRpcHook() {
-        return null;
+    public RPCHook getRpcHook() {
+        return rpcHook;
     }
 
     protected void sendSystemMessage(Object data) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index 3333ebd2d..f70c06b8f 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -41,6 +41,7 @@ import 
org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
 import org.apache.rocketmq.proxy.service.admin.AdminService;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.proxy.service.route.TopicRouteService;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -53,8 +54,8 @@ public class HeartbeatSyncer extends 
AbstractSystemMessageSyncer {
     protected String localProxyId;
 
     public HeartbeatSyncer(TopicRouteService topicRouteService, AdminService 
adminService,
-        ConsumerManager consumerManager, MQClientAPIFactory 
mqClientAPIFactory) {
-        super(topicRouteService, adminService, mqClientAPIFactory);
+                           ConsumerManager consumerManager, MQClientAPIFactory 
mqClientAPIFactory, RPCHook rpcHook) {
+        super(topicRouteService, adminService, mqClientAPIFactory, rpcHook);
         this.consumerManager = consumerManager;
         this.localProxyId = buildLocalProxyId();
         this.init();
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 6373aba30..c67f4953d 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -172,7 +172,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
             .build();
         
when(grpcClientSettingsManager.getRawClientSettings(eq(clientId))).thenReturn(settings);
 
-        HeartbeatSyncer heartbeatSyncer = new 
HeartbeatSyncer(topicRouteService, adminService, consumerManager, 
mqClientAPIFactory);
+        HeartbeatSyncer heartbeatSyncer = new 
HeartbeatSyncer(topicRouteService, adminService, consumerManager, 
mqClientAPIFactory, null);
         heartbeatSyncer.onConsumerRegister(
             consumerGroup,
             clientChannelInfo,
@@ -240,7 +240,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
             4
         );
 
-        HeartbeatSyncer heartbeatSyncer = new 
HeartbeatSyncer(topicRouteService, adminService, consumerManager, 
mqClientAPIFactory);
+        HeartbeatSyncer heartbeatSyncer = new 
HeartbeatSyncer(topicRouteService, adminService, consumerManager, 
mqClientAPIFactory, null);
         SendResult okSendResult = new SendResult();
         okSendResult.setSendStatus(SendStatus.SEND_OK);
         {

Reply via email to