This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 74f3aec4 [ISSUE #688] Add information about transaction Source on endTransaction Request (#689) 74f3aec4 is described below commit 74f3aec491a269181215fe3b3cd2bbbcf14678a9 Author: Jixiang Jin <lolli...@apache.org> AuthorDate: Wed Feb 28 15:26:55 2024 +0800 [ISSUE #688] Add information about transaction Source on endTransaction Request (#689) --- .../client/java/impl/producer/ProducerImpl.java | 12 +++++++----- .../client/java/impl/producer/TransactionImpl.java | 21 +++++++++++++++++---- .../java/impl/producer/TransactionImplTest.java | 5 +++-- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java index 450a68d5..a17d1958 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java @@ -17,8 +17,6 @@ package org.apache.rocketmq.client.java.impl.producer; -import static com.google.common.base.Preconditions.checkNotNull; - import apache.rocketmq.v2.ClientType; import apache.rocketmq.v2.Code; import apache.rocketmq.v2.EndTransactionRequest; @@ -29,6 +27,8 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand; import apache.rocketmq.v2.SendMessageRequest; import apache.rocketmq.v2.SendMessageResponse; import apache.rocketmq.v2.Status; +import apache.rocketmq.v2.TransactionSource; +import com.google.common.base.Preconditions; import com.google.common.math.IntMath; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -163,7 +163,7 @@ class ProducerImpl extends ClientImpl implements Producer { } final GeneralMessage generalMessage = new GeneralMessageImpl(messageView); endTransaction(endpoints, generalMessage, messageView.getMessageId(), - transactionId, resolution); + transactionId, resolution, TransactionSource.SOURCE_SERVER_CHECK); } catch (Throwable t) { log.error("Exception raised while ending the transaction, messageId={}, transactionId={}, " + "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t); @@ -241,7 +241,7 @@ class ProducerImpl extends ClientImpl implements Producer { */ @Override public Transaction beginTransaction() { - checkNotNull(checker, "Transaction checker should not be null"); + Preconditions.checkNotNull(checker, "Transaction checker should not be null"); if (!this.isRunning()) { log.error("Unable to begin a transaction because producer is not running, state={}, clientId={}", this.state(), clientId); @@ -256,9 +256,11 @@ class ProducerImpl extends ClientImpl implements Producer { } public void endTransaction(Endpoints endpoints, GeneralMessage generalMessage, MessageId messageId, - String transactionId, final TransactionResolution resolution) throws ClientException { + String transactionId, final TransactionResolution resolution, final TransactionSource transactionSource) + throws ClientException { final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder() .setMessageId(messageId.toString()).setTransactionId(transactionId) + .setSource(transactionSource) .setTopic(apache.rocketmq.v2.Resource.newBuilder() .setResourceNamespace(clientConfiguration.getNamespace()) .setName(generalMessage.getTopic()) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java index 51ca92cc..9bc97c24 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.java.impl.producer; +import apache.rocketmq.v2.TransactionSource; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.io.IOException; import java.util.HashSet; @@ -94,8 +95,14 @@ class TransactionImpl implements Transaction { for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) { final PublishingMessageImpl publishingMessage = entry.getKey(); final SendReceiptImpl sendReceipt = entry.getValue(); - producerImpl.endTransaction(sendReceipt.getEndpoints(), new GeneralMessageImpl(publishingMessage), - sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT); + producerImpl.endTransaction( + sendReceipt.getEndpoints(), + new GeneralMessageImpl(publishingMessage), + sendReceipt.getMessageId(), + sendReceipt.getTransactionId(), + TransactionResolution.COMMIT, + TransactionSource.SOURCE_CLIENT + ); } } @@ -107,8 +114,14 @@ class TransactionImpl implements Transaction { for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) { final PublishingMessageImpl publishingMessage = entry.getKey(); final SendReceiptImpl sendReceipt = entry.getValue(); - producerImpl.endTransaction(sendReceipt.getEndpoints(), new GeneralMessageImpl(publishingMessage), - sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK); + producerImpl.endTransaction( + sendReceipt.getEndpoints(), + new GeneralMessageImpl(publishingMessage), + sendReceipt.getMessageId(), + sendReceipt.getTransactionId(), + TransactionResolution.ROLLBACK, + TransactionSource.SOURCE_CLIENT + ); } } } diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java index 6cca321b..68d05078 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java.impl.producer; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import apache.rocketmq.v2.TransactionSource; import java.io.IOException; import java.util.HashSet; import java.util.Set; @@ -115,7 +116,7 @@ public class TransactionImplTest extends TestBase { final SendReceiptImpl sendReceipt = fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0)); transaction.tryAddReceipt(publishingMessage, sendReceipt); Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), any(GeneralMessage.class), - any(MessageId.class), anyString(), any(TransactionResolution.class)); + any(MessageId.class), anyString(), any(TransactionResolution.class), any(TransactionSource.class)); transaction.commit(); } @@ -130,7 +131,7 @@ public class TransactionImplTest extends TestBase { final SendReceiptImpl sendReceipt = fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0)); transaction.tryAddReceipt(publishingMessage, sendReceipt); Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), any(GeneralMessage.class), - any(MessageId.class), anyString(), any(TransactionResolution.class)); + any(MessageId.class), anyString(), any(TransactionResolution.class), any(TransactionSource.class)); transaction.rollback(); } } \ No newline at end of file