This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f33ac2a3e [ISSUE #6714] Replace the deprecated method 
DefaultMQPushConsumer#getDefaultMQPushConsumerImpl (#6715)
f33ac2a3e is described below

commit f33ac2a3ece691bc15cb875726b5ad054a60ae22
Author: mxsm <[email protected]>
AuthorDate: Wed May 10 09:59:18 2023 +0800

    [ISSUE #6714] Replace the deprecated method 
DefaultMQPushConsumer#getDefaultMQPushConsumerImpl (#6715)
---
 .../client/consumer/DefaultMQPushConsumer.java         | 18 +++++++++++++++---
 .../impl/consumer/ConsumeMessageOrderlyService.java    |  2 +-
 .../impl/consumer/ConsumeMessagePopOrderlyService.java |  2 +-
 .../rocketmq/client/impl/consumer/ProcessQueue.java    |  2 +-
 .../example/tracemessage/OpenTracingPushConsumer.java  |  2 +-
 .../rocketmq/consumer/PushConsumerImpl.java            |  2 +-
 6 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index c11a3c642..1afb9113e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -29,6 +29,7 @@ import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAverage
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
 import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
 import org.apache.rocketmq.client.trace.TraceDispatcher;
@@ -417,10 +418,9 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         if (enableMsgTrace) {
             try {
                 AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, 
customizedTraceTopic, rpcHook);
-                
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
+                dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
                 traceDispatcher = dispatcher;
-                this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
-                    new ConsumeMessageTraceHookImpl(traceDispatcher));
+                this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new 
ConsumeMessageTraceHookImpl(traceDispatcher));
             } catch (Throwable e) {
                 log.error("system mqtrace hook init failed ,maybe can't send 
msg trace data");
             }
@@ -858,6 +858,18 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         this.defaultMQPushConsumerImpl.resume();
     }
 
+    public boolean isPause() {
+        return this.defaultMQPushConsumerImpl.isPause();
+    }
+
+    public boolean isConsumeOrderly() {
+        return this.defaultMQPushConsumerImpl.isConsumeOrderly();
+    }
+
+    public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
+        this.defaultMQPushConsumerImpl.registerConsumeMessageHook(hook);
+    }
+
     /**
      * This method will be removed in a certain version after April 5, 2020, 
so please do not use this method.
      */
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 75857d17d..f9c00839c 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -388,7 +388,7 @@ public class ConsumeMessageOrderlyService implements 
ConsumeMessageService {
             MessageAccessor.clearProperty(newMsg, 
MessageConst.PROPERTY_TRANSACTION_PREPARED);
             newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
 
-            
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
+            
this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg);
             return true;
         } catch (Exception e) {
             log.error("sendMessageBack exception, group: " + 
this.consumerGroup + " msg: " + msg.toString(), e);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
index 8616cf109..ae6adfea5 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
@@ -304,7 +304,7 @@ public class ConsumeMessagePopOrderlyService implements 
ConsumeMessageService {
             MessageAccessor.setMaxReconsumeTimes(newMsg, 
String.valueOf(getMaxReconsumeTimes()));
             newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
 
-            
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
+            
this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg);
             return true;
         } catch (Exception e) {
             log.error("sendMessageBack exception, group: " + 
this.consumerGroup + " msg: " + msg.toString(), e);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 74238e024..ab94a9846 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -75,7 +75,7 @@ public class ProcessQueue {
      * @param pushConsumer
      */
     public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
-        if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
+        if (pushConsumer.isConsumeOrderly()) {
             return;
         }
 
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
 
b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
index 72295f366..9ac7c1634 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
@@ -41,7 +41,7 @@ public class OpenTracingPushConsumer {
 
         // Uncomment the following line while debugging, namesrvAddr should be 
set to your local address
 //        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
-        consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new 
ConsumeMessageOpenTracingHookImpl(tracer));
+        consumer.registerConsumeMessageHook(new 
ConsumeMessageOpenTracingHookImpl(tracer));
 
         consumer.subscribe(TOPIC, "*");
         
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
diff --git 
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
 
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index d5d394a69..1675a16f1 100644
--- 
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ 
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -101,7 +101,7 @@ public class PushConsumerImpl implements PushConsumer {
 
     @Override
     public boolean isSuspended() {
-        return 
this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
+        return this.rocketmqPushConsumer.isPause();
     }
 
     @Override

Reply via email to