artemlivshits commented on code in PR #19193: URL: https://github.com/apache/kafka/pull/19193#discussion_r1996307814
########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -124,10 +127,18 @@ class TransactionCoordinator(txnConfig: TransactionConfig, // if transactional id is empty then return error as invalid request. This is // to make TransactionCoordinator's behavior consistent with producer client responseCallback(initTransactionError(Errors.INVALID_REQUEST)) - } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) { + } else if (enableTwoPCFlag && !txnManager.isTransaction2pcEnabled()) { + // if the request is to enable two-phase commit but the broker 2PC config is set to false, + // then return an error. + responseCallback(initTransactionError(Errors.INVALID_TXN_STATE)) Review Comment: The KIP says we should return `TRANSACTIONAL_ID_AUTHORIZATION_FAILED` in this case. ########## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfig.java: ########## @@ -47,25 +48,33 @@ public final class TransactionStateManagerConfig { public static final int TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT = (int) TimeUnit.HOURS.toMillis(1); public static final String TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC = "The interval at which to remove transactions that have expired due to <code>transactional.id.expiration.ms</code> passing"; + public static final String TRANSACTIONS_2PC_ENABLED_CONFIG = "transaction.two.phase.commit.enable"; + public static final boolean TRANSACTIONS_2PC_ENABLED_DEFAULT = false; + public static final String TRANSACTIONS_2PC_ENABLED_DOC = "Enable to allow participation in Two-Phase Commit (2PC) transactions with an external transaction coordinator"; Review Comment: Maybe just "Allow participation ..." ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -124,10 +127,18 @@ class TransactionCoordinator(txnConfig: TransactionConfig, // if transactional id is empty then return error as invalid request. This is // to make TransactionCoordinator's behavior consistent with producer client responseCallback(initTransactionError(Errors.INVALID_REQUEST)) - } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) { + } else if (enableTwoPCFlag && !txnManager.isTransaction2pcEnabled()) { + // if the request is to enable two-phase commit but the broker 2PC config is set to false, + // then return an error. + responseCallback(initTransactionError(Errors.INVALID_TXN_STATE)) + } else if (keepPreparedTxn) { + // if the request is to keep the prepared transaction, then return an unsupported version error. Review Comment: Could you clarify in the comment, that this error is returned because this features is not implemented yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org