This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 88e644756b [ISSUE #7988] Refector client trace (#7989)
88e644756b is described below

commit 88e644756bab0ebc01feba53483d831d477f0627
Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com>
AuthorDate: Tue Apr 2 11:31:30 2024 +0800

    [ISSUE #7988] Refector client trace (#7989)
    
    * [ISSUE #7988] Refector client trace
    
    * build trace dispatcher in start method
    * setNamespaceV2 for dispatcher
    * disable trace for inner traceProducer
    * fix tls
---
 .../org/apache/rocketmq/client/ClientConfig.java   | 32 ++++++++++++
 .../client/consumer/DefaultLitePullConsumer.java   | 31 +++++------
 .../client/consumer/DefaultMQPushConsumer.java     | 48 ++++++++---------
 .../client/producer/DefaultMQProducer.java         | 60 +++++++++-------------
 .../client/trace/AsyncTraceDispatcher.java         | 11 ++++
 .../DefaultMQConsumerWithOpenTracingTest.java      |  2 +
 .../trace/DefaultMQConsumerWithTraceTest.java      | 10 ++--
 .../DefaultMQProducerWithOpenTracingTest.java      |  2 +
 .../trace/DefaultMQProducerWithTraceTest.java      | 13 ++---
 .../TransactionMQProducerWithOpenTracingTest.java  |  2 +
 .../trace/TransactionMQProducerWithTraceTest.java  |  5 +-
 11 files changed, 124 insertions(+), 92 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java 
b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 8a7beffc70..48c995301a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -98,6 +98,16 @@ public class ClientConfig {
 
     private boolean enableHeartbeatChannelEventListener = true;
 
+    /**
+     * The switch for message trace
+     */
+    protected boolean enableTrace = true;
+
+    /**
+     * The name value of message trace topic. If not set, the default trace 
topic name will be used.
+     */
+    protected String traceTopic;
+
     public String buildMQClientId() {
         StringBuilder sb = new StringBuilder();
         sb.append(this.getClientIP());
@@ -215,6 +225,8 @@ public class ClientConfig {
         this.detectInterval = cc.detectInterval;
         this.detectTimeout = cc.detectTimeout;
         this.namespaceV2 = cc.namespaceV2;
+        this.enableTrace = cc.enableTrace;
+        this.traceTopic = cc.traceTopic;
     }
 
     public ClientConfig cloneClientConfig() {
@@ -245,6 +257,8 @@ public class ClientConfig {
         cc.detectInterval = detectInterval;
         cc.detectTimeout = detectTimeout;
         cc.namespaceV2 = namespaceV2;
+        cc.enableTrace = enableTrace;
+        cc.traceTopic = traceTopic;
         return cc;
     }
 
@@ -474,6 +488,22 @@ public class ClientConfig {
         this.useHeartbeatV2 = useHeartbeatV2;
     }
 
+    public boolean isEnableTrace() {
+        return enableTrace;
+    }
+
+    public void setEnableTrace(boolean enableTrace) {
+        this.enableTrace = enableTrace;
+    }
+
+    public String getTraceTopic() {
+        return traceTopic;
+    }
+
+    public void setTraceTopic(String traceTopic) {
+        this.traceTopic = traceTopic;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig{" +
@@ -505,6 +535,8 @@ public class ClientConfig {
             ", sendLatencyEnable=" + sendLatencyEnable +
             ", startDetectorEnable=" + startDetectorEnable +
             ", enableHeartbeatChannelEventListener=" + 
enableHeartbeatChannelEventListener +
+            ", enableTrace=" + enableTrace +
+            ", traceTopic='" + traceTopic + '\'' +
             '}';
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index c193c6a42e..3364df48f8 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -169,15 +169,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
      */
     private TraceDispatcher traceDispatcher = null;
 
-    /**
-     * The flag for message trace
-     */
-    private boolean enableMsgTrace = false;
-
-    /**
-     * The name value of message trace topic.If you don't config,you can use 
the default trace topic name.
-     */
-    private String customizedTraceTopic;
+    private RPCHook rpcHook;
 
     /**
      * Default constructor.
@@ -212,6 +204,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
      */
     public DefaultLitePullConsumer(final String consumerGroup, RPCHook 
rpcHook) {
         this.consumerGroup = consumerGroup;
+        this.rpcHook = rpcHook;
         this.enableStreamRequestType = true;
         defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, 
rpcHook);
     }
@@ -226,6 +219,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     public DefaultLitePullConsumer(final String namespace, final String 
consumerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.consumerGroup = consumerGroup;
+        this.rpcHook = rpcHook;
         this.enableStreamRequestType = true;
         defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, 
rpcHook);
     }
@@ -592,15 +586,12 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
         return traceDispatcher;
     }
 
-    public void setCustomizedTraceTopic(String customizedTraceTopic) {
-        this.customizedTraceTopic = customizedTraceTopic;
-    }
-
     private void setTraceDispatcher() {
-        if (isEnableMsgTrace()) {
+        if (enableTrace) {
             try {
-                AsyncTraceDispatcher traceDispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, 
customizedTraceTopic, null);
+                AsyncTraceDispatcher traceDispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, 
rpcHook);
                 traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
+                traceDispatcher.setNamespaceV2(namespaceV2);
                 this.traceDispatcher = traceDispatcher;
                 this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
                     new ConsumeMessageTraceHookImpl(traceDispatcher));
@@ -611,14 +602,18 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     }
 
     public String getCustomizedTraceTopic() {
-        return customizedTraceTopic;
+        return traceTopic;
+    }
+
+    public void setCustomizedTraceTopic(String customizedTraceTopic) {
+        this.traceTopic = customizedTraceTopic;
     }
 
     public boolean isEnableMsgTrace() {
-        return enableMsgTrace;
+        return enableTrace;
     }
 
     public void setEnableMsgTrace(boolean enableMsgTrace) {
-        this.enableMsgTrace = enableMsgTrace;
+        this.enableTrace = enableMsgTrace;
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 502c5ef184..312f4632ca 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -293,6 +293,8 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     // force to use client rebalance
     private boolean clientRebalance = true;
 
+    private RPCHook rpcHook = null;
+
     /**
      * Default constructor.
      */
@@ -327,6 +329,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
      */
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
         this.consumerGroup = consumerGroup;
+        this.rpcHook = rpcHook;
         this.allocateMessageQueueStrategy = new 
AllocateMessageQueueAveragely();
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
     }
@@ -353,6 +356,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
         AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
         this.consumerGroup = consumerGroup;
+        this.rpcHook = rpcHook;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
     }
@@ -369,18 +373,11 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
         AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean 
enableMsgTrace, final String customizedTraceTopic) {
         this.consumerGroup = consumerGroup;
+        this.rpcHook = rpcHook;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
-        if (enableMsgTrace) {
-            try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, 
customizedTraceTopic, rpcHook);
-                dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
-                traceDispatcher = dispatcher;
-                this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new 
ConsumeMessageTraceHookImpl(traceDispatcher));
-            } catch (Throwable e) {
-                log.error("system mqtrace hook init failed ,maybe can't send 
msg trace data");
-            }
-        }
+        this.enableTrace = enableMsgTrace;
+        this.traceTopic = customizedTraceTopic;
     }
 
     /**
@@ -419,6 +416,7 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
         this.consumerGroup = consumerGroup;
         this.namespace = namespace;
+        this.rpcHook = rpcHook;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
     }
@@ -438,18 +436,11 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean 
enableMsgTrace, final String customizedTraceTopic) {
         this.consumerGroup = consumerGroup;
         this.namespace = namespace;
+        this.rpcHook = rpcHook;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, 
rpcHook);
-        if (enableMsgTrace) {
-            try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, 
customizedTraceTopic, rpcHook);
-                dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
-                traceDispatcher = dispatcher;
-                this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new 
ConsumeMessageTraceHookImpl(traceDispatcher));
-            } catch (Throwable e) {
-                log.error("system mqtrace hook init failed ,maybe can't send 
msg trace data");
-            }
-        }
+        this.enableTrace = enableMsgTrace;
+        this.traceTopic = customizedTraceTopic;
     }
 
     /**
@@ -464,9 +455,6 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     @Override
     public void setUseTLS(boolean useTLS) {
         super.setUseTLS(useTLS);
-        if (traceDispatcher instanceof AsyncTraceDispatcher) {
-            ((AsyncTraceDispatcher) 
traceDispatcher).getTraceProducer().setUseTLS(useTLS);
-        }
     }
 
     /**
@@ -750,7 +738,21 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     public void start() throws MQClientException {
         setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), 
this.consumerGroup));
         this.defaultMQPushConsumerImpl.start();
+        if (enableTrace) {
+            try {
+                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, 
rpcHook);
+                dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
+                dispatcher.setNamespaceV2(namespaceV2);
+                traceDispatcher = dispatcher;
+                this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new 
ConsumeMessageTraceHookImpl(traceDispatcher));
+            } catch (Throwable e) {
+                log.error("system mqtrace hook init failed ,maybe can't send 
msg trace data");
+            }
+        }
         if (null != traceDispatcher) {
+            if (traceDispatcher instanceof AsyncTraceDispatcher) {
+                ((AsyncTraceDispatcher) 
traceDispatcher).getTraceProducer().setUseTLS(isUseTLS());
+            }
             try {
                 traceDispatcher.start(this.getNamesrvAddr(), 
this.getAccessChannel());
             } catch (MQClientException e) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index cabe96ca7b..0abf925a82 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -167,6 +167,8 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      */
     private int backPressureForAsyncSendSize = 100 * 1024 * 1024;
 
+    private RPCHook rpcHook = null;
+
     /**
      * Default constructor.
      */
@@ -202,6 +204,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      */
     public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
         this.producerGroup = producerGroup;
+        this.rpcHook = rpcHook;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
         produceAccumulator = 
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
     }
@@ -243,20 +246,8 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, 
boolean enableMsgTrace,
         final String customizedTraceTopic) {
         this(producerGroup, rpcHook);
-        //if client open the message trace feature
-        if (enableMsgTrace) {
-            try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, 
customizedTraceTopic, rpcHook);
-                dispatcher.setHostProducer(this.defaultMQProducerImpl);
-                traceDispatcher = dispatcher;
-                this.defaultMQProducerImpl.registerSendMessageHook(
-                    new SendMessageTraceHookImpl(traceDispatcher));
-                this.defaultMQProducerImpl.registerEndTransactionHook(
-                    new EndTransactionTraceHookImpl(traceDispatcher));
-            } catch (Throwable e) {
-                logger.error("system mqtrace hook init failed ,maybe can't 
send msg trace data");
-            }
-        }
+        this.enableTrace = enableMsgTrace;
+        this.traceTopic = customizedTraceTopic;
     }
 
     /**
@@ -298,6 +289,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     public DefaultMQProducer(final String namespace, final String 
producerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
+        this.rpcHook = rpcHook;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
         produceAccumulator = 
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
     }
@@ -318,27 +310,8 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         boolean enableMsgTrace, final String customizedTraceTopic) {
         this(namespace, producerGroup, rpcHook);
         //if client open the message trace feature
-        if (enableMsgTrace) {
-            try {
-                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, 
customizedTraceTopic, rpcHook);
-                dispatcher.setHostProducer(this.defaultMQProducerImpl);
-                traceDispatcher = dispatcher;
-                this.defaultMQProducerImpl.registerSendMessageHook(
-                    new SendMessageTraceHookImpl(traceDispatcher));
-                this.defaultMQProducerImpl.registerEndTransactionHook(
-                    new EndTransactionTraceHookImpl(traceDispatcher));
-            } catch (Throwable e) {
-                logger.error("system mqtrace hook init failed ,maybe can't 
send msg trace data");
-            }
-        }
-    }
-
-    @Override
-    public void setUseTLS(boolean useTLS) {
-        super.setUseTLS(useTLS);
-        if (traceDispatcher instanceof AsyncTraceDispatcher) {
-            ((AsyncTraceDispatcher) 
traceDispatcher).getTraceProducer().setUseTLS(useTLS);
-        }
+        this.enableTrace = enableMsgTrace;
+        this.traceTopic = customizedTraceTopic;
     }
 
     /**
@@ -356,7 +329,24 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         if (this.produceAccumulator != null) {
             this.produceAccumulator.start();
         }
+        if (enableTrace) {
+            try {
+                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, traceTopic, 
rpcHook);
+                dispatcher.setHostProducer(this.defaultMQProducerImpl);
+                dispatcher.setNamespaceV2(this.namespaceV2);
+                traceDispatcher = dispatcher;
+                this.defaultMQProducerImpl.registerSendMessageHook(
+                    new SendMessageTraceHookImpl(traceDispatcher));
+                this.defaultMQProducerImpl.registerEndTransactionHook(
+                    new EndTransactionTraceHookImpl(traceDispatcher));
+            } catch (Throwable e) {
+                logger.error("system mqtrace hook init failed ,maybe can't 
send msg trace data");
+            }
+        }
         if (null != traceDispatcher) {
+            if (traceDispatcher instanceof AsyncTraceDispatcher) {
+                ((AsyncTraceDispatcher) 
traceDispatcher).getTraceProducer().setUseTLS(isUseTLS());
+            }
             try {
                 traceDispatcher.start(this.getNamesrvAddr(), 
this.getAccessChannel());
             } catch (MQClientException e) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index ea423b7176..d44f22616f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -78,6 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private volatile AccessChannel accessChannel = AccessChannel.LOCAL;
     private String group;
     private Type type;
+    private String namespaceV2;
 
     public AsyncTraceDispatcher(String group, Type type, String 
traceTopicName, RPCHook rpcHook) {
         // queueSize is greater than or equal to the n power of 2 of value
@@ -144,10 +145,20 @@ public class AsyncTraceDispatcher implements 
TraceDispatcher {
         this.hostConsumer = hostConsumer;
     }
 
+    public String getNamespaceV2() {
+        return namespaceV2;
+    }
+
+    public void setNamespaceV2(String namespaceV2) {
+        this.namespaceV2 = namespaceV2;
+    }
+
     public void start(String nameSrvAddr, AccessChannel accessChannel) throws 
MQClientException {
         if (isStarted.compareAndSet(false, true)) {
             traceProducer.setNamesrvAddr(nameSrvAddr);
             traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + 
nameSrvAddr);
+            traceProducer.setNamespaceV2(namespaceV2);
+            traceProducer.setEnableTrace(false);
             traceProducer.start();
         }
         this.accessChannel = accessChannel;
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
index a39ae4a4de..028445ef2d 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
@@ -135,6 +135,8 @@ public class DefaultMQConsumerWithOpenTracingTest {
             new ConsumeMessageOpenTracingHookImpl(tracer));
         pushConsumer.setNamesrvAddr("127.0.0.1:9876");
         pushConsumer.setPullInterval(60 * 1000);
+        // disable trace to let mock trace work
+        pushConsumer.setEnableTrace(false);
 
         OffsetStore offsetStore = Mockito.mock(OffsetStore.class);
         Mockito.when(offsetStore.readOffset(any(MessageQueue.class), 
any(ReadOffsetType.class))).thenReturn(0L);
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index 60aa446bbe..fc63cce1ce 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -128,11 +128,9 @@ public class DefaultMQConsumerWithTraceTest {
         normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal, 
false, "");
         customTraceTopicPushConsumer = new 
DefaultMQPushConsumer(consumerGroup, true, customerTraceTopic);
         pushConsumer.setNamesrvAddr("127.0.0.1:9876");
+        pushConsumer.setUseTLS(true);
         pushConsumer.setPullInterval(60 * 1000);
 
-        asyncTraceDispatcher = (AsyncTraceDispatcher) 
pushConsumer.getTraceDispatcher();
-        traceProducer = asyncTraceDispatcher.getTraceProducer();
-
         pushConsumer.registerMessageListener(new MessageListenerConcurrently() 
{
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
@@ -157,6 +155,9 @@ public class DefaultMQConsumerWithTraceTest {
 
         pushConsumer.start();
 
+        asyncTraceDispatcher = (AsyncTraceDispatcher) 
pushConsumer.getTraceDispatcher();
+        traceProducer = asyncTraceDispatcher.getTraceProducer();
+
         mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
         mQClientTraceFactory = spy(pushConsumerImpl.getmQClientFactory());
 
@@ -242,9 +243,6 @@ public class DefaultMQConsumerWithTraceTest {
 
     @Test
     public void testPushConsumerWithTraceTLS() {
-        DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("consumerGroup", true, null);
-        consumer.setUseTLS(true);
-        AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) 
consumer.getTraceDispatcher();
         Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
     }
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
index 8fbc70ea44..9ce9d6b494 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
@@ -88,6 +88,8 @@ public class DefaultMQProducerWithOpenTracingTest {
                 new SendMessageOpenTracingHookImpl(tracer));
         producer.setNamesrvAddr("127.0.0.1:9876");
         message = new Message(topic, new byte[] {'a', 'b', 'c'});
+        // disable trace to let mock trace work
+        producer.setEnableTrace(false);
 
         producer.start();
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index ee17335185..ed680d8e6c 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -92,14 +92,14 @@ public class DefaultMQProducerWithTraceTest {
         normalProducer.setNamesrvAddr("127.0.0.1:9877");
         customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
         message = new Message(topic, new byte[] {'a', 'b', 'c'});
-        asyncTraceDispatcher = (AsyncTraceDispatcher) 
producer.getTraceDispatcher();
-        asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
-        asyncTraceDispatcher.getHostProducer();
-        asyncTraceDispatcher.getHostConsumer();
-        traceProducer = asyncTraceDispatcher.getTraceProducer();
+        producer.setTraceTopic(customerTraceTopic);
+        producer.setUseTLS(true);
 
         producer.start();
 
+        asyncTraceDispatcher = (AsyncTraceDispatcher) 
producer.getTraceDispatcher();
+        traceProducer = asyncTraceDispatcher.getTraceProducer();
+
         Field field = 
DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
         field.setAccessible(true);
         field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
@@ -150,9 +150,6 @@ public class DefaultMQProducerWithTraceTest {
 
     @Test
     public void testProducerWithTraceTLS() {
-        DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, 
true, null);
-        producer.setUseTLS(true);
-        AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) 
producer.getTraceDispatcher();
         Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
     }
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
index 5646a17dbe..5d4b81d16d 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
@@ -103,6 +103,8 @@ public class TransactionMQProducerWithOpenTracingTest {
         producer.getDefaultMQProducerImpl().registerSendMessageHook(new 
SendMessageOpenTracingHookImpl(tracer));
         producer.getDefaultMQProducerImpl().registerEndTransactionHook(new 
EndTransactionOpenTracingHookImpl(tracer));
         producer.setTransactionListener(transactionListener);
+        // disable trace to let mock trace work
+        producer.setEnableTrace(false);
 
         producer.setNamesrvAddr("127.0.0.1:9876");
         message = new Message(topic, new byte[] {'a', 'b', 'c'});
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
index 8cf87444c0..9f6036153b 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
@@ -111,11 +111,12 @@ public class TransactionMQProducerWithTraceTest {
 
         producer.setNamesrvAddr("127.0.0.1:9876");
         message = new Message(topic, new byte[] {'a', 'b', 'c'});
-        asyncTraceDispatcher = (AsyncTraceDispatcher) 
producer.getTraceDispatcher();
-        traceProducer = asyncTraceDispatcher.getTraceProducer();
 
         producer.start();
 
+        asyncTraceDispatcher = (AsyncTraceDispatcher) 
producer.getTraceDispatcher();
+        traceProducer = asyncTraceDispatcher.getTraceProducer();
+
         Field field = 
DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
         field.setAccessible(true);
         field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);

Reply via email to