artemlivshits commented on code in PR #19539: URL: https://github.com/apache/kafka/pull/19539#discussion_r2082499313
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -330,6 +334,17 @@ public synchronized void beginTransaction() { transitionTo(State.IN_TRANSACTION); } + /** + * Begin preparing a transaction for a two-phase commit. + * This transitions the transaction to the PREPARED_TRANSACTION state. + */ + public synchronized void beginPrepare() { Review Comment: We should probably just name it `prepare`, the other functions have a "begin" in front because they are asynchronous. ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -760,6 +765,48 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs } } + /** + * Prepares the current transaction for a two-phase commit. This method will flush all pending messages + * and transition the producer into a mode where only {@link #commitTransaction()}, {@link #abortTransaction()}, + * or completeTransaction(PreparedTxnState) may be called. + * <p> + * This method is used as part of a two-phase commit protocol: + * <ol> + * <li>Prepare the transaction by calling this method. This returns a {@link PreparedTxnState} if successful.</li> + * <li>Make any external system changes that need to be atomic with this transaction.</li> + * <li>Complete the transaction by calling {@link #commitTransaction()}, {@link #abortTransaction()} or + * completeTransaction(PreparedTxnState).</li> + * </ol> + * + * @return the prepared transaction state to use when completing the transaction + * + * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started yet. + * @throws InvalidTxnStateException if the producer is not in a state where preparing + * a transaction is possible or 2PC is not enabled. + * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active + * @throws UnsupportedVersionException fatal error indicating the broker + * does not support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws AuthorizationException fatal error indicating that the configured + * transactional.id is not authorized. See the exception for more details + * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for preparing the transaction has surpassed <code>max.block.ms</code> + * @throws InterruptException if the thread is interrupted while blocked + */ + @Override + public PreparedTxnState prepareTransaction() throws ProducerFencedException { + throwIfNoTransactionManager(); + throwIfProducerClosed(); + if (!transactionManager.is2PCEnabled()) { + throw new InvalidTxnStateException("Cannot prepare a transaction when 2PC is not enabled"); + } + long now = time.nanoseconds(); + flush(); + transactionManager.beginPrepare(); + sender.wakeup(); Review Comment: Why do we need to wakeup? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -1058,6 +1077,15 @@ synchronized boolean isInitializing() { return isTransactional() && currentState == State.INITIALIZING; } + /** + * Check if the transaction is in the preparing state. + * + * @return true if the current state is PREPARED_TRANSACTION + */ + public synchronized boolean isInPreparingState() { Review Comment: Probably rename to `isInPreparedState` or maybe just `isPrepared`. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -1453,7 +1481,15 @@ public void handleResponse(AbstractResponse response) { ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data().producerId(), initProducerIdResponse.data().producerEpoch()); setProducerIdAndEpoch(producerIdAndEpoch); - transitionTo(State.READY); + + // If this is a 2PC-enabled transaction with keepPreparedTxn=true, transition directly + // to PREPARED_TRANSACTION + if (enable2PC && builder.data.keepPreparedTxn()) { + transitionTo(State.PREPARED_TRANSACTION); Review Comment: Note that the transaction may not be ongoing on the broker. Did we verify that commit / abort works in that case? ########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -760,6 +765,48 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs } } + /** + * Prepares the current transaction for a two-phase commit. This method will flush all pending messages + * and transition the producer into a mode where only {@link #commitTransaction()}, {@link #abortTransaction()}, + * or completeTransaction(PreparedTxnState) may be called. + * <p> + * This method is used as part of a two-phase commit protocol: + * <ol> + * <li>Prepare the transaction by calling this method. This returns a {@link PreparedTxnState} if successful.</li> + * <li>Make any external system changes that need to be atomic with this transaction.</li> + * <li>Complete the transaction by calling {@link #commitTransaction()}, {@link #abortTransaction()} or + * completeTransaction(PreparedTxnState).</li> + * </ol> + * + * @return the prepared transaction state to use when completing the transaction + * + * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started yet. + * @throws InvalidTxnStateException if the producer is not in a state where preparing + * a transaction is possible or 2PC is not enabled. + * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active + * @throws UnsupportedVersionException fatal error indicating the broker + * does not support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws AuthorizationException fatal error indicating that the configured + * transactional.id is not authorized. See the exception for more details + * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for preparing the transaction has surpassed <code>max.block.ms</code> + * @throws InterruptException if the thread is interrupted while blocked + */ + @Override + public PreparedTxnState prepareTransaction() throws ProducerFencedException { + throwIfNoTransactionManager(); + throwIfProducerClosed(); Review Comment: Do we also want to `throwIfInPreparedState();` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org