artemlivshits commented on code in PR #19429: URL: https://github.com/apache/kafka/pull/19429#discussion_r2051049933
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -273,17 +276,29 @@ public TransactionManager(final LogContext logContext, this.retryBackoffMs = retryBackoffMs; this.txnPartitionMap = new TxnPartitionMap(logContext); this.apiVersions = apiVersions; + this.enable2PC = enable2PC; } void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) { shouldPoisonStateOnInvalidTransition.set(shouldPoisonState); } public synchronized TransactionalRequestResult initializeTransactions() { - return initializeTransactions(ProducerIdAndEpoch.NONE); + return initializeTransactions(ProducerIdAndEpoch.NONE, false); Review Comment: For an internal class do we need an overload or we could just call `initializeTransactions(false)` directly? ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -654,6 +657,46 @@ public void initTransactions() { transactionManager.maybeUpdateTransactionV2Enabled(true); } + /** + * Initialize the transactional state for this producer, similar to {@link #initTransactions()} but + * with additional handling for two-phase commit (2PC). Must be called before any send operations Review Comment: The way it's defined in the KIP, using `keepPreparedTxn` isn't coupled with 2PC -- while it's essential for 2PC support, it can also be called when 2PC isn't used. ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -654,6 +657,46 @@ public void initTransactions() { transactionManager.maybeUpdateTransactionV2Enabled(true); } + /** + * Initialize the transactional state for this producer, similar to {@link #initTransactions()} but + * with additional handling for two-phase commit (2PC). Must be called before any send operations + * that require a {@code transactionalId}. + * <p> + * Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to + * {@code true}, the producer does <em>not</em> automatically abort existing transactions + * in the “prepare” phase. Instead, it enters a recovery mode allowing only finalization + * of those previously prepared transactions. This behavior is crucial for 2PC scenarios, + * where transactions should remain intact until the external transaction manager decides + * whether to commit or abort. + * <p> + * When {@code keepPreparedTxn} is {@code false}, this behaves like the normal transactional + * initialization, aborting any unfinished transactions and resetting the producer for + * new writes. + * + * @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC + * recovery), false to abort existing transactions and behave like + * the standard initTransactions + * + * @throws IllegalStateException if no {@code transactional.id} is configured + * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not + * support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured + * {@code transactional.id} is unauthorized either for normal transaction writes or 2PC. + * @throws KafkaException if the producer encounters a fatal error or any other unexpected error + * @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>. + * @throws InterruptException if the thread is interrupted while blocked + */ + public void initTransactions(boolean keepPreparedTxn) { Review Comment: Should we change `initTransactions()` to just call `initTransactions(false)`? Maybe even implement it as a `default` method in the `Producer` interface, so that we don't need to implement it in `MockProducer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org