This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 74f3aec4 [ISSUE #688] Add information about transaction Source on 
endTransaction Request (#689)
74f3aec4 is described below

commit 74f3aec491a269181215fe3b3cd2bbbcf14678a9
Author: Jixiang Jin <lolli...@apache.org>
AuthorDate: Wed Feb 28 15:26:55 2024 +0800

    [ISSUE #688] Add information about transaction Source on endTransaction 
Request (#689)
---
 .../client/java/impl/producer/ProducerImpl.java     | 12 +++++++-----
 .../client/java/impl/producer/TransactionImpl.java  | 21 +++++++++++++++++----
 .../java/impl/producer/TransactionImplTest.java     |  5 +++--
 3 files changed, 27 insertions(+), 11 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 450a68d5..a17d1958 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import apache.rocketmq.v2.ClientType;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.EndTransactionRequest;
@@ -29,6 +27,8 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.Status;
+import apache.rocketmq.v2.TransactionSource;
+import com.google.common.base.Preconditions;
 import com.google.common.math.IntMath;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -163,7 +163,7 @@ class ProducerImpl extends ClientImpl implements Producer {
                     }
                     final GeneralMessage generalMessage = new 
GeneralMessageImpl(messageView);
                     endTransaction(endpoints, generalMessage, 
messageView.getMessageId(),
-                        transactionId, resolution);
+                        transactionId, resolution, 
TransactionSource.SOURCE_SERVER_CHECK);
                 } catch (Throwable t) {
                     log.error("Exception raised while ending the transaction, 
messageId={}, transactionId={}, "
                         + "endpoints={}, clientId={}", messageId, 
transactionId, endpoints, clientId, t);
@@ -241,7 +241,7 @@ class ProducerImpl extends ClientImpl implements Producer {
      */
     @Override
     public Transaction beginTransaction() {
-        checkNotNull(checker, "Transaction checker should not be null");
+        Preconditions.checkNotNull(checker, "Transaction checker should not be 
null");
         if (!this.isRunning()) {
             log.error("Unable to begin a transaction because producer is not 
running, state={}, clientId={}",
                 this.state(), clientId);
@@ -256,9 +256,11 @@ class ProducerImpl extends ClientImpl implements Producer {
     }
 
     public void endTransaction(Endpoints endpoints, GeneralMessage 
generalMessage, MessageId messageId,
-        String transactionId, final TransactionResolution resolution) throws 
ClientException {
+        String transactionId, final TransactionResolution resolution, final 
TransactionSource transactionSource)
+        throws ClientException {
         final EndTransactionRequest.Builder builder = 
EndTransactionRequest.newBuilder()
             .setMessageId(messageId.toString()).setTransactionId(transactionId)
+            .setSource(transactionSource)
             .setTopic(apache.rocketmq.v2.Resource.newBuilder()
                 .setResourceNamespace(clientConfiguration.getNamespace())
                 .setName(generalMessage.getTopic())
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
index 51ca92cc..9bc97c24 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import apache.rocketmq.v2.TransactionSource;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import java.io.IOException;
 import java.util.HashSet;
@@ -94,8 +95,14 @@ class TransactionImpl implements Transaction {
         for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : 
messageSendReceiptMap.entrySet()) {
             final PublishingMessageImpl publishingMessage = entry.getKey();
             final SendReceiptImpl sendReceipt = entry.getValue();
-            producerImpl.endTransaction(sendReceipt.getEndpoints(), new 
GeneralMessageImpl(publishingMessage),
-                sendReceipt.getMessageId(), sendReceipt.getTransactionId(), 
TransactionResolution.COMMIT);
+            producerImpl.endTransaction(
+                sendReceipt.getEndpoints(),
+                new GeneralMessageImpl(publishingMessage),
+                sendReceipt.getMessageId(),
+                sendReceipt.getTransactionId(),
+                TransactionResolution.COMMIT,
+                TransactionSource.SOURCE_CLIENT
+            );
         }
     }
 
@@ -107,8 +114,14 @@ class TransactionImpl implements Transaction {
         for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : 
messageSendReceiptMap.entrySet()) {
             final PublishingMessageImpl publishingMessage = entry.getKey();
             final SendReceiptImpl sendReceipt = entry.getValue();
-            producerImpl.endTransaction(sendReceipt.getEndpoints(), new 
GeneralMessageImpl(publishingMessage),
-                sendReceipt.getMessageId(), sendReceipt.getTransactionId(), 
TransactionResolution.ROLLBACK);
+            producerImpl.endTransaction(
+                sendReceipt.getEndpoints(),
+                new GeneralMessageImpl(publishingMessage),
+                sendReceipt.getMessageId(),
+                sendReceipt.getTransactionId(),
+                TransactionResolution.ROLLBACK,
+                TransactionSource.SOURCE_CLIENT
+            );
         }
     }
 }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index 6cca321b..68d05078 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java.impl.producer;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 
+import apache.rocketmq.v2.TransactionSource;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
@@ -115,7 +116,7 @@ public class TransactionImplTest extends TestBase {
         final SendReceiptImpl sendReceipt = 
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
         transaction.tryAddReceipt(publishingMessage, sendReceipt);
         
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), 
any(GeneralMessage.class),
-            any(MessageId.class), anyString(), 
any(TransactionResolution.class));
+            any(MessageId.class), anyString(), 
any(TransactionResolution.class), any(TransactionSource.class));
         transaction.commit();
     }
 
@@ -130,7 +131,7 @@ public class TransactionImplTest extends TestBase {
         final SendReceiptImpl sendReceipt = 
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
         transaction.tryAddReceipt(publishingMessage, sendReceipt);
         
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class), 
any(GeneralMessage.class),
-            any(MessageId.class), anyString(), 
any(TransactionResolution.class));
+            any(MessageId.class), anyString(), 
any(TransactionResolution.class), any(TransactionSource.class));
         transaction.rollback();
     }
 }
\ No newline at end of file

Reply via email to