artemlivshits commented on code in PR #19539: URL: https://github.com/apache/kafka/pull/19539#discussion_r2076695831
########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -978,6 +1022,14 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call try { throwIfProducerClosed(); + + // Check if we're in a prepared transaction state, in which case send is not allowed Review Comment: We need to do the same in `sendOffsetsToTransaction` method at least. Also, could you check that `beginTransaction` also fails if in prepared state and if not, add a check and a test? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -150,6 +151,7 @@ private enum State { INITIALIZING, READY, IN_TRANSACTION, + PREPARING_TRANSACTION, Review Comment: `PREPARED_TRANSACTION` ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -165,13 +167,15 @@ private boolean isTransitionValid(State source, State target) { return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; case IN_TRANSACTION: return source == READY; + case PREPARING_TRANSACTION: + return source == IN_TRANSACTION || source == INITIALIZING; case COMMITTING_TRANSACTION: - return source == IN_TRANSACTION; + return source == IN_TRANSACTION || source == PREPARING_TRANSACTION; case ABORTING_TRANSACTION: - return source == IN_TRANSACTION || source == ABORTABLE_ERROR; + return source == IN_TRANSACTION || source == PREPARING_TRANSACTION || source == ABORTABLE_ERROR; case ABORTABLE_ERROR: return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR - || source == INITIALIZING; + || source == INITIALIZING || source == PREPARING_TRANSACTION; Review Comment: Once transaction is prepared, it cannot get an abortable error. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -330,6 +334,17 @@ public synchronized void beginTransaction() { transitionTo(State.IN_TRANSACTION); } + /** + * Begin preparing a transaction for a two-phase commit. + * This transitions the transaction to the PREPARING_TRANSACTION state. + */ + public synchronized void beginPrepare() { + ensureTransactional(); + throwIfPendingState("prepareTransaction"); + maybeFailWithError(); + transitionTo(State.PREPARING_TRANSACTION); Review Comment: We should also transition into prepared state, if `keepPreparedTxn` was `true` and the transaction is ongoing on the TC. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org