artemlivshits commented on code in PR #19193:
URL: https://github.com/apache/kafka/pull/19193#discussion_r1996307814


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -124,10 +127,18 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       // if transactional id is empty then return error as invalid request. 
This is
       // to make TransactionCoordinator's behavior consistent with producer 
client
       responseCallback(initTransactionError(Errors.INVALID_REQUEST))
-    } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) 
{
+    } else if (enableTwoPCFlag && !txnManager.isTransaction2pcEnabled()) {
+      // if the request is to enable two-phase commit but the broker 2PC 
config is set to false,
+      // then return an error.
+      responseCallback(initTransactionError(Errors.INVALID_TXN_STATE))

Review Comment:
   The KIP says we should return `TRANSACTIONAL_ID_AUTHORIZATION_FAILED` in 
this case.



##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java:
##########
@@ -47,25 +48,33 @@ public final class TransactionStateManagerConfig {
     public static final int 
TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT = 
(int) TimeUnit.HOURS.toMillis(1);
     public static final String 
TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC = "The interval at 
which to remove transactions that have expired due to 
<code>transactional.id.expiration.ms</code> passing";
 
+    public static final String TRANSACTIONS_2PC_ENABLED_CONFIG = 
"transaction.two.phase.commit.enable";
+    public static final boolean TRANSACTIONS_2PC_ENABLED_DEFAULT = false;
+    public static final String TRANSACTIONS_2PC_ENABLED_DOC = "Enable to allow 
participation in Two-Phase Commit (2PC) transactions with an external 
transaction coordinator";

Review Comment:
   Maybe just "Allow participation ..."



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -124,10 +127,18 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
       // if transactional id is empty then return error as invalid request. 
This is
       // to make TransactionCoordinator's behavior consistent with producer 
client
       responseCallback(initTransactionError(Errors.INVALID_REQUEST))
-    } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) 
{
+    } else if (enableTwoPCFlag && !txnManager.isTransaction2pcEnabled()) {
+      // if the request is to enable two-phase commit but the broker 2PC 
config is set to false,
+      // then return an error.
+      responseCallback(initTransactionError(Errors.INVALID_TXN_STATE))
+    } else if (keepPreparedTxn) {
+      // if the request is to keep the prepared transaction, then return an 
unsupported version error.

Review Comment:
   Could you clarify in the comment, that this error is returned because this 
features is not implemented yet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to