largeTree edited a comment on issue #2496:
URL: https://github.com/apache/rocketmq/issues/2496#issuecomment-747846243
I use this because I need to construct and send messages during the
transaction, but I need these messages to be delivered after the transaction is
committed
Just like the demo below
```
public class ProducerUtils {
private static TransactionMQProducer producer;
private static ThreadLocal<Map<String, SendResult>> txMessageCache =
new ThreadLocal<>();
private static void init() {
try {
producer = new
TransactionMQProducer("transactionMQProducer");
producer.setNamesrvAddr("ip:port");
producer.setTransactionListener(new
SelfTransactionListener());
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* send message in transacion and cache sendResult
*
* @param msg
* @param bizKey
*/
public static void sendPrepare(String msg, String bizKey) {
if (producer == null) {
init();
}
Message message = new Message("TransactionTopic",
"transactionTest", bizKey, msg.getBytes());
try {
TransactionSendResult sendResult =
producer.sendMessageInTransaction(message, bizKey);
// cached in threadlocal
Map<String, SendResult> map = txMessageCache.get();
if (map == null) {
map = new HashMap<>();
txMessageCache.set(map);
}
map.put(sendResult.getMsgId(), sendResult);
System.out.println("cache sendResult:" +
sendResult.getMsgId() + ", txId = " + sendResult.getTransactionId());
} catch (MQClientException e) {
e.printStackTrace();
}
}
public static void commit() {
endTransaction(LocalTransactionState.COMMIT_MESSAGE);
}
public static void rollback() {
endTransaction(LocalTransactionState.ROLLBACK_MESSAGE);
}
public static Map<String, SendResult> getTxMessageCacheMap() {
return txMessageCache.get();
}
private static void endTransaction(LocalTransactionState
localTransactionState) {
Map<String, SendResult> cacheMap = txMessageCache.get();
if (cacheMap == null) {
return;
}
DefaultMQProducerImpl defaultMQProducerImpl =
producer.getDefaultMQProducerImpl();
for (Iterator<Map.Entry<String, SendResult>> iter =
cacheMap.entrySet().iterator(); iter.hasNext();) {
try {
Entry<String, SendResult> entry = iter.next();
System.out.println("doEndTransaction : " +
localTransactionState);
// Manually commit or roll back
defaultMQProducerImpl.endTransaction(entry.getValue(), localTransactionState,
null);
iter.remove();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class SelfTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("check local transaction status :" + new
String(msg.getBody()));
String transactionId = msg.getTransactionId();
System.out.println("transactionId:" + transactionId);
Map<String, SendResult> cacheMap =
ProducerUtils.getTxMessageCacheMap();
SendResult sendResult = null;
if (cacheMap != null) {
sendResult = cacheMap.get(msg.getMsgId());
}
LocalTransactionState state = LocalTransactionState.UNKNOW;
if (sendResult == null && checkTxCommited(msg)) {
state = LocalTransactionState.COMMIT_MESSAGE;
} else if (sendResult == null) { // sendResult not in the cache
anymore
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}
/**
* check tx state from database
*
* @param msg
* @return
*/
private boolean checkTxCommited(MessageExt msg) {
return false; // or true
}
}
@Service
public class TxService {
@Transactional
public void doInTx(String msg, String key) {
// Do some operations on the database with transaction
// send transaction message and executeLocalTransaction always
return LocalTransactionState.UNKNOW
// Because it's in localTransaction right now
ProducerUtils.sendPrepare(msg, key);
TransactionSynchronizationManager.registerSynchronization(new
TransactionSynchronization() {
// When the spring transaction is complete, I will
manually commit or roll back the transaction message sent earlier
@Override
public void afterCompletion(int state) {
if (TransactionSynchronization.STATUS_COMMITTED
== state) {
ProducerUtils.commit(); // commit
} else if
(TransactionSynchronization.STATUS_ROLLED_BACK == state) {
ProducerUtils.rollback(); // rollback
}
}
});
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]