artemlivshits commented on code in PR #19539:
URL: https://github.com/apache/kafka/pull/19539#discussion_r2082499313


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -330,6 +334,17 @@ public synchronized void beginTransaction() {
         transitionTo(State.IN_TRANSACTION);
     }
 
+    /**
+     * Begin preparing a transaction for a two-phase commit.
+     * This transitions the transaction to the PREPARED_TRANSACTION state.
+     */
+    public synchronized void beginPrepare() {

Review Comment:
   We should probably just name it `prepare`, the other functions have a 
"begin" in front because they are asynchronous.



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -760,6 +765,48 @@ public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offs
         }
     }
 
+    /**
+     * Prepares the current transaction for a two-phase commit. This method 
will flush all pending messages
+     * and transition the producer into a mode where only {@link 
#commitTransaction()}, {@link #abortTransaction()},
+     * or completeTransaction(PreparedTxnState) may be called.
+     * <p>
+     * This method is used as part of a two-phase commit protocol:
+     * <ol>
+     *   <li>Prepare the transaction by calling this method. This returns a 
{@link PreparedTxnState} if successful.</li>
+     *   <li>Make any external system changes that need to be atomic with this 
transaction.</li>
+     *   <li>Complete the transaction by calling {@link #commitTransaction()}, 
{@link #abortTransaction()} or
+     *       completeTransaction(PreparedTxnState).</li>
+     * </ol>
+     *
+     * @return the prepared transaction state to use when completing the 
transaction
+     *
+     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started yet.
+     * @throws InvalidTxnStateException if the producer is not in a state 
where preparing
+     *         a transaction is possible or 2PC is not enabled.
+     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
+     * @throws UnsupportedVersionException fatal error indicating the broker
+     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
+     * @throws AuthorizationException fatal error indicating that the 
configured
+     *         transactional.id is not authorized. See the exception for more 
details
+     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
+     * @throws TimeoutException if the time taken for preparing the 
transaction has surpassed <code>max.block.ms</code>
+     * @throws InterruptException if the thread is interrupted while blocked
+     */
+    @Override
+    public PreparedTxnState prepareTransaction() throws 
ProducerFencedException {
+        throwIfNoTransactionManager();
+        throwIfProducerClosed();
+        if (!transactionManager.is2PCEnabled()) {
+            throw new InvalidTxnStateException("Cannot prepare a transaction 
when 2PC is not enabled");
+        }
+        long now = time.nanoseconds();
+        flush();
+        transactionManager.beginPrepare();
+        sender.wakeup();

Review Comment:
   Why do we need to wakeup?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1058,6 +1077,15 @@ synchronized boolean isInitializing() {
         return isTransactional() && currentState == State.INITIALIZING;
     }
 
+    /**
+     * Check if the transaction is in the preparing state.
+     *
+     * @return true if the current state is PREPARED_TRANSACTION
+     */
+    public synchronized boolean isInPreparingState() {

Review Comment:
   Probably rename to `isInPreparedState` or maybe just `isPrepared`.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1453,7 +1481,15 @@ public void handleResponse(AbstractResponse response) {
                 ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(initProducerIdResponse.data().producerId(),
                         initProducerIdResponse.data().producerEpoch());
                 setProducerIdAndEpoch(producerIdAndEpoch);
-                transitionTo(State.READY);
+
+                // If this is a 2PC-enabled transaction with 
keepPreparedTxn=true, transition directly
+                // to PREPARED_TRANSACTION
+                if (enable2PC && builder.data.keepPreparedTxn()) {
+                    transitionTo(State.PREPARED_TRANSACTION);

Review Comment:
   Note that the transaction may not be ongoing on the broker.  Did we verify 
that commit / abort works in that case?



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -760,6 +765,48 @@ public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offs
         }
     }
 
+    /**
+     * Prepares the current transaction for a two-phase commit. This method 
will flush all pending messages
+     * and transition the producer into a mode where only {@link 
#commitTransaction()}, {@link #abortTransaction()},
+     * or completeTransaction(PreparedTxnState) may be called.
+     * <p>
+     * This method is used as part of a two-phase commit protocol:
+     * <ol>
+     *   <li>Prepare the transaction by calling this method. This returns a 
{@link PreparedTxnState} if successful.</li>
+     *   <li>Make any external system changes that need to be atomic with this 
transaction.</li>
+     *   <li>Complete the transaction by calling {@link #commitTransaction()}, 
{@link #abortTransaction()} or
+     *       completeTransaction(PreparedTxnState).</li>
+     * </ol>
+     *
+     * @return the prepared transaction state to use when completing the 
transaction
+     *
+     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started yet.
+     * @throws InvalidTxnStateException if the producer is not in a state 
where preparing
+     *         a transaction is possible or 2PC is not enabled.
+     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
+     * @throws UnsupportedVersionException fatal error indicating the broker
+     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
+     * @throws AuthorizationException fatal error indicating that the 
configured
+     *         transactional.id is not authorized. See the exception for more 
details
+     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
+     * @throws TimeoutException if the time taken for preparing the 
transaction has surpassed <code>max.block.ms</code>
+     * @throws InterruptException if the thread is interrupted while blocked
+     */
+    @Override
+    public PreparedTxnState prepareTransaction() throws 
ProducerFencedException {
+        throwIfNoTransactionManager();
+        throwIfProducerClosed();

Review Comment:
   Do we also want to `throwIfInPreparedState();` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to