This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a7d493b2fb transactionProducer get the topic route before sending the 
message (#7569)
a7d493b2fb is described below

commit a7d493b2fbc153cc6cbdf2b2ffcbf19cf7cba803
Author: panzhi <panzh...@qq.com>
AuthorDate: Tue Nov 21 20:55:35 2023 +0800

    transactionProducer get the topic route before sending the message (#7569)
---
 .../impl/producer/DefaultMQProducerImpl.java       | 15 ++++++
 .../client/producer/DefaultMQProducer.java         | 63 ++++++++++++++++++++++
 .../client/producer/TransactionMQProducer.java     | 23 ++++++--
 .../example/transaction/TransactionProducer.java   |  3 +-
 4 files changed, 98 insertions(+), 6 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 545f17d931..088bff0891 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -262,6 +262,8 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                     mQClientFactory.start();
                 }
 
+                this.initTopicRoute();
+
                 this.mqFaultStrategy.startDetector();
 
                 log.info("the producer [{}] start OK. 
sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
@@ -1740,6 +1742,19 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
         }
     }
 
+    private void initTopicRoute() {
+        List<String> topics = this.defaultMQProducer.getTopics();
+        if (topics != null && topics.size() > 0) {
+            topics.forEach(topic -> {
+                String newTopic = 
NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), topic);
+                TopicPublishInfo topicPublishInfo = 
tryToFindTopicPublishInfo(newTopic);
+                if (topicPublishInfo == null || !topicPublishInfo.ok()) {
+                    log.warn("No route info of this topic: " + newTopic + 
FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO));
+                }
+            });
+        }
+    }
+
     public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
         return topicPublishInfoTable;
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 7bd3876f5a..700e00aac1 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -87,6 +87,11 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      */
     private String producerGroup;
 
+    /**
+     * Topics that need to be initialized for transaction producer
+     */
+    private List<String> topics;
+
     /**
      * Just for testing or demo program
      */
@@ -235,6 +240,22 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         produceAccumulator = 
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
     }
 
+    /**
+     * Constructor specifying namespace, producer group, topics and RPC hook.
+     *
+     * @param namespace     Namespace for this MQ Producer instance.
+     * @param producerGroup Producer group, see the name-sake field.
+     * @param topics        Topic that needs to be initialized for routing
+     * @param rpcHook       RPC hook to execute per each remoting command 
execution.
+     */
+    public DefaultMQProducer(final String namespace, final String 
producerGroup, final List<String> topics, RPCHook rpcHook) {
+        this.namespace = namespace;
+        this.producerGroup = producerGroup;
+        this.topics = topics;
+        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+        produceAccumulator = 
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+    }
+
     /**
      * Constructor specifying producer group and enabled msg trace flag.
      *
@@ -290,6 +311,41 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         }
     }
 
+    /**
+     * Constructor specifying namespace, producer group, topics, RPC hook, 
enabled msgTrace flag and customized trace topic
+     * name.
+     *
+     * @param namespace            Namespace for this MQ Producer instance.
+     * @param producerGroup        Producer group, see the name-sake field.
+     * @param topics               Topic that needs to be initialized for 
routing
+     * @param rpcHook              RPC hook to execute per each remoting 
command execution.
+     * @param enableMsgTrace       Switch flag instance for message trace.
+     * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default
+     *                             trace topic name.
+     */
+    public DefaultMQProducer(final String namespace, final String 
producerGroup, final List<String> topics,
+                             RPCHook rpcHook, boolean enableMsgTrace, final 
String customizedTraceTopic) {
+        this.namespace = namespace;
+        this.producerGroup = producerGroup;
+        this.topics = topics;
+        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+        produceAccumulator = 
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+        //if client open the message trace feature
+        if (enableMsgTrace) {
+            try {
+                AsyncTraceDispatcher dispatcher = new 
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, 
customizedTraceTopic, rpcHook);
+                dispatcher.setHostProducer(this.defaultMQProducerImpl);
+                traceDispatcher = dispatcher;
+                this.defaultMQProducerImpl.registerSendMessageHook(
+                        new SendMessageTraceHookImpl(traceDispatcher));
+                this.defaultMQProducerImpl.registerEndTransactionHook(
+                        new EndTransactionTraceHookImpl(traceDispatcher));
+            } catch (Throwable e) {
+                logger.error("system mqtrace hook init failed ,maybe can't 
send msg trace data");
+            }
+        }
+    }
+
     @Override
     public void setUseTLS(boolean useTLS) {
         super.setUseTLS(useTLS);
@@ -1316,4 +1372,11 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         
defaultMQProducerImpl.setSemaphoreAsyncSendSize(backPressureForAsyncSendSize);
     }
 
+    public List<String> getTopics() {
+        return topics;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
index d529f3e778..2c3b479f77 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.producer;
 
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.Message;
@@ -36,19 +37,31 @@ public class TransactionMQProducer extends 
DefaultMQProducer {
     }
 
     public TransactionMQProducer(final String producerGroup) {
-        this(null, producerGroup, null);
+        this(null, producerGroup, null, null);
+    }
+
+    public TransactionMQProducer(final String producerGroup, final 
List<String> topics) {
+        this(null, producerGroup, topics, null);
     }
 
     public TransactionMQProducer(final String namespace, final String 
producerGroup) {
-        this(namespace, producerGroup, null);
+        this(namespace, producerGroup, null, null);
+    }
+
+    public TransactionMQProducer(final String namespace, final String 
producerGroup, final List<String> topics) {
+        this(namespace, producerGroup, topics, null);
     }
 
     public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
-        this(null, producerGroup, rpcHook);
+        this(null, producerGroup, null, rpcHook);
+    }
+
+    public TransactionMQProducer(final String producerGroup, final 
List<String> topics, RPCHook rpcHook) {
+        this(null, producerGroup, topics, rpcHook);
     }
 
-    public TransactionMQProducer(final String namespace, final String 
producerGroup, RPCHook rpcHook) {
-        super(namespace, producerGroup, rpcHook);
+    public TransactionMQProducer(final String namespace, final String 
producerGroup, final List<String> topics, RPCHook rpcHook) {
+        super(namespace, producerGroup, topics, rpcHook);
     }
 
     public TransactionMQProducer(final String namespace, final String 
producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String 
customizedTraceTopic) {
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
 
b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
index 5973c3c306..d1d57c55ef 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
 import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -39,7 +40,7 @@ public class TransactionProducer {
 
     public static void main(String[] args) throws MQClientException, 
InterruptedException {
         TransactionListener transactionListener = new 
TransactionListenerImpl();
-        TransactionMQProducer producer = new 
TransactionMQProducer(PRODUCER_GROUP);
+        TransactionMQProducer producer = new 
TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC));
 
         // Uncomment the following line while debugging, namesrvAddr should be 
set to your local address
 //        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

Reply via email to