artemlivshits commented on code in PR #19429:
URL: https://github.com/apache/kafka/pull/19429#discussion_r2051049933


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -273,17 +276,29 @@ public TransactionManager(final LogContext logContext,
         this.retryBackoffMs = retryBackoffMs;
         this.txnPartitionMap = new TxnPartitionMap(logContext);
         this.apiVersions = apiVersions;
+        this.enable2PC = enable2PC;
     }
 
     void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
         shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
     }
 
     public synchronized TransactionalRequestResult initializeTransactions() {
-        return initializeTransactions(ProducerIdAndEpoch.NONE);
+        return initializeTransactions(ProducerIdAndEpoch.NONE, false);

Review Comment:
   For an internal class do we need an overload or we could just call 
`initializeTransactions(false)` directly?



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -654,6 +657,46 @@ public void initTransactions() {
         transactionManager.maybeUpdateTransactionV2Enabled(true);
     }
 
+    /**
+     * Initialize the transactional state for this producer, similar to {@link 
#initTransactions()} but
+     * with additional handling for two-phase commit (2PC). Must be called 
before any send operations

Review Comment:
   The way it's defined in the KIP, using `keepPreparedTxn` isn't coupled with 
2PC -- while it's essential for 2PC support, it can also be called when 2PC 
isn't used.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -654,6 +657,46 @@ public void initTransactions() {
         transactionManager.maybeUpdateTransactionV2Enabled(true);
     }
 
+    /**
+     * Initialize the transactional state for this producer, similar to {@link 
#initTransactions()} but
+     * with additional handling for two-phase commit (2PC). Must be called 
before any send operations
+     * that require a {@code transactionalId}.
+     * <p>
+     * Unlike the standard {@link #initTransactions()}, when {@code 
keepPreparedTxn} is set to
+     * {@code true}, the producer does <em>not</em> automatically abort 
existing transactions
+     * in the “prepare” phase. Instead, it enters a recovery mode allowing 
only finalization
+     * of those previously prepared transactions. This behavior is crucial for 
2PC scenarios,
+     * where transactions should remain intact until the external transaction 
manager decides
+     * whether to commit or abort.
+     * <p>
+     * When {@code keepPreparedTxn} is {@code false}, this behaves like the 
normal transactional
+     * initialization, aborting any unfinished transactions and resetting the 
producer for
+     * new writes.
+     *
+     * @param keepPreparedTxn true to retain any in-flight prepared 
transactions (necessary for 2PC
+     *                        recovery), false to abort existing transactions 
and behave like
+     *                        the standard initTransactions
+     *
+     * @throws IllegalStateException if no {@code transactional.id} is 
configured
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException if 
the broker does not
+     *         support transactions (i.e. if its version is lower than 
0.11.0.0)
+     * @throws 
org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the 
configured
+     *         {@code transactional.id} is unauthorized either for normal 
transaction writes or 2PC.
+     * @throws KafkaException if the producer encounters a fatal error or any 
other unexpected error
+     * @throws TimeoutException if the time taken for initialize the 
transaction has surpassed <code>max.block.ms</code>.
+     * @throws InterruptException if the thread is interrupted while blocked
+     */
+    public void initTransactions(boolean keepPreparedTxn) {

Review Comment:
   Should we change `initTransactions()` to just call 
`initTransactions(false)`?  Maybe even implement it as a `default` method in 
the `Producer` interface, so that we don't need to implement it in 
`MockProducer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to