largeTree edited a comment on issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496#issuecomment-747846243


   I use this because I need to construct and send messages during the 
transaction, but I need these messages to be delivered after the transaction is 
committed
   
   Just like the demo below
   
   ```
   public class ProducerUtils {
   
        private static TransactionMQProducer producer;
        private static ThreadLocal<Map<String, SendResult>> txMessageCache = 
new ThreadLocal<>();
   
        private static void init() {
                try {
                        producer = new 
TransactionMQProducer("transactionMQProducer");
                        producer.setNamesrvAddr("ip:port");
                        producer.setTransactionListener(new 
SelfTransactionListener());
                        producer.start();
                } catch (MQClientException e) {
                        e.printStackTrace();
                }
        }
   
        /**
         * send message in transacion and cache sendResult
         * 
         * @param msg
         * @param bizKey
         */
        public static void sendPrepare(String msg, String bizKey) {
                if (producer == null) {
                        init();
                }
                Message message = new Message("TransactionTopic", 
"transactionTest", bizKey, msg.getBytes());
                try {
                        TransactionSendResult sendResult = 
producer.sendMessageInTransaction(message, bizKey);
                        
                        // cached in threadlocal 
                        Map<String, SendResult> map = txMessageCache.get();
                        if (map == null) {
                                map = new HashMap<>();
                                txMessageCache.set(map);
                        }
                        map.put(sendResult.getMsgId(), sendResult);
                        
                        System.out.println("cache sendResult:" + 
sendResult.getMsgId() + ", txId = " + sendResult.getTransactionId());
                } catch (MQClientException e) {
                        e.printStackTrace();
                }
        }
   
        public static void commit() {
                endTransaction(LocalTransactionState.COMMIT_MESSAGE);
        }
   
        public static void rollback() {
                endTransaction(LocalTransactionState.ROLLBACK_MESSAGE);
        }
        
        public static Map<String, SendResult> getTxMessageCacheMap() {
                return txMessageCache.get();
        }
   
        private static void endTransaction(LocalTransactionState 
localTransactionState) {
                Map<String, SendResult> cacheMap = txMessageCache.get();
                if (cacheMap == null) {
                        return;
                }
                
                DefaultMQProducerImpl defaultMQProducerImpl = 
producer.getDefaultMQProducerImpl();
                for (Iterator<Map.Entry<String, SendResult>> iter = 
cacheMap.entrySet().iterator(); iter.hasNext();) {
                        try {
                                
                                Entry<String, SendResult> entry = iter.next();
                                System.out.println("doEndTransaction : " + 
localTransactionState);
                                // Manually commit or roll back
                                
defaultMQProducerImpl.endTransaction(entry.getValue(), localTransactionState, 
null);
                                iter.remove();
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
                }
        }
   }
   
   public class SelfTransactionListener implements TransactionListener {
   
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, 
Object arg) {
                return LocalTransactionState.UNKNOW;
        }
   
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("check local transaction status :" + new 
String(msg.getBody()));
                String transactionId = msg.getTransactionId();
                System.out.println("transactionId:" + transactionId);
   
                Map<String, SendResult> cacheMap = 
ProducerUtils.getTxMessageCacheMap();
                SendResult sendResult = null;
                if (cacheMap != null) {
                        sendResult = cacheMap.get(msg.getMsgId());
                }
   
                LocalTransactionState state = LocalTransactionState.UNKNOW;
                if (sendResult == null && checkTxCommited(msg)) {
                        state = LocalTransactionState.COMMIT_MESSAGE;
                } else if (sendResult == null) { // sendResult not in the cache 
anymore
                        state = LocalTransactionState.ROLLBACK_MESSAGE;
                }
   
                return state;
        }
   
        /**
         * check tx state from database
         * 
         * @param msg
         * @return
         */
        private boolean checkTxCommited(MessageExt msg) {
                return false;  // or true 
        }
   }
   
   @Service
   public class TxService {
   
        @Transactional
        public void doInTx(String msg, String key) {
                
                // Do some operations on the database with transaction 
                
                // send transaction message and executeLocalTransaction always 
return LocalTransactionState.UNKNOW
                   // Because it's in localTransaction right now
                ProducerUtils.sendPrepare(msg, key);
   
                TransactionSynchronizationManager.registerSynchronization(new 
TransactionSynchronization() {
                        // When the spring transaction is complete, I will 
manually commit or roll back the transaction message sent earlier
                        @Override
                        public void afterCompletion(int state) {
                                if (TransactionSynchronization.STATUS_COMMITTED 
== state) {
                                        ProducerUtils.commit(); // commit 
                                } else if 
(TransactionSynchronization.STATUS_ROLLED_BACK == state) {
                                        ProducerUtils.rollback(); // rollback
                                }
                        }
                });
        }
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to