This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f33ac2a3e [ISSUE #6714] Replace the deprecated method
DefaultMQPushConsumer#getDefaultMQPushConsumerImpl (#6715)
f33ac2a3e is described below
commit f33ac2a3ece691bc15cb875726b5ad054a60ae22
Author: mxsm <[email protected]>
AuthorDate: Wed May 10 09:59:18 2023 +0800
[ISSUE #6714] Replace the deprecated method
DefaultMQPushConsumer#getDefaultMQPushConsumerImpl (#6715)
---
.../client/consumer/DefaultMQPushConsumer.java | 18 +++++++++++++++---
.../impl/consumer/ConsumeMessageOrderlyService.java | 2 +-
.../impl/consumer/ConsumeMessagePopOrderlyService.java | 2 +-
.../rocketmq/client/impl/consumer/ProcessQueue.java | 2 +-
.../example/tracemessage/OpenTracingPushConsumer.java | 2 +-
.../rocketmq/consumer/PushConsumerImpl.java | 2 +-
6 files changed, 20 insertions(+), 8 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index c11a3c642..1afb9113e 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -29,6 +29,7 @@ import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAverage
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
@@ -417,10 +418,9 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME,
customizedTraceTopic, rpcHook);
-
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
+ dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
traceDispatcher = dispatcher;
- this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
- new ConsumeMessageTraceHookImpl(traceDispatcher));
+ this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new
ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send
msg trace data");
}
@@ -858,6 +858,18 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
this.defaultMQPushConsumerImpl.resume();
}
+ public boolean isPause() {
+ return this.defaultMQPushConsumerImpl.isPause();
+ }
+
+ public boolean isConsumeOrderly() {
+ return this.defaultMQPushConsumerImpl.isConsumeOrderly();
+ }
+
+ public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
+ this.defaultMQPushConsumerImpl.registerConsumeMessageHook(hook);
+ }
+
/**
* This method will be removed in a certain version after April 5, 2020,
so please do not use this method.
*/
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 75857d17d..f9c00839c 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -388,7 +388,7 @@ public class ConsumeMessageOrderlyService implements
ConsumeMessageService {
MessageAccessor.clearProperty(newMsg,
MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
-
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
+
this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg);
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " +
this.consumerGroup + " msg: " + msg.toString(), e);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
index 8616cf109..ae6adfea5 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
@@ -304,7 +304,7 @@ public class ConsumeMessagePopOrderlyService implements
ConsumeMessageService {
MessageAccessor.setMaxReconsumeTimes(newMsg,
String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
-
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
+
this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg);
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " +
this.consumerGroup + " msg: " + msg.toString(), e);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 74238e024..ab94a9846 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -75,7 +75,7 @@ public class ProcessQueue {
* @param pushConsumer
*/
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
- if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
+ if (pushConsumer.isConsumeOrderly()) {
return;
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
index 72295f366..9ac7c1634 100644
---
a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
@@ -41,7 +41,7 @@ public class OpenTracingPushConsumer {
// Uncomment the following line while debugging, namesrvAddr should be
set to your local address
// consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
- consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new
ConsumeMessageOpenTracingHookImpl(tracer));
+ consumer.registerConsumeMessageHook(new
ConsumeMessageOpenTracingHookImpl(tracer));
consumer.subscribe(TOPIC, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
diff --git
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index d5d394a69..1675a16f1 100644
---
a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++
b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -101,7 +101,7 @@ public class PushConsumerImpl implements PushConsumer {
@Override
public boolean isSuspended() {
- return
this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
+ return this.rocketmqPushConsumer.isPause();
}
@Override