artemlivshits commented on code in PR #19539:
URL: https://github.com/apache/kafka/pull/19539#discussion_r2076695831


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -978,6 +1022,14 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, 
V> record, Callback call
 
         try {
             throwIfProducerClosed();
+            
+            // Check if we're in a prepared transaction state, in which case 
send is not allowed

Review Comment:
   We need to do the same in `sendOffsetsToTransaction` method at least.  Also, 
could you check that `beginTransaction` also fails if in prepared state and if 
not, add a check and a test?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -150,6 +151,7 @@ private enum State {
         INITIALIZING,
         READY,
         IN_TRANSACTION,
+        PREPARING_TRANSACTION,

Review Comment:
   `PREPARED_TRANSACTION`



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -165,13 +167,15 @@ private boolean isTransitionValid(State source, State 
target) {
                     return source == INITIALIZING || source == 
COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
                 case IN_TRANSACTION:
                     return source == READY;
+                case PREPARING_TRANSACTION:
+                    return source == IN_TRANSACTION || source == INITIALIZING;
                 case COMMITTING_TRANSACTION:
-                    return source == IN_TRANSACTION;
+                    return source == IN_TRANSACTION || source == 
PREPARING_TRANSACTION;
                 case ABORTING_TRANSACTION:
-                    return source == IN_TRANSACTION || source == 
ABORTABLE_ERROR;
+                    return source == IN_TRANSACTION || source == 
PREPARING_TRANSACTION || source == ABORTABLE_ERROR;
                 case ABORTABLE_ERROR:
                     return source == IN_TRANSACTION || source == 
COMMITTING_TRANSACTION || source == ABORTABLE_ERROR
-                            || source == INITIALIZING;
+                            || source == INITIALIZING || source == 
PREPARING_TRANSACTION;

Review Comment:
   Once transaction is prepared, it cannot get an abortable error.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -330,6 +334,17 @@ public synchronized void beginTransaction() {
         transitionTo(State.IN_TRANSACTION);
     }
 
+    /**
+     * Begin preparing a transaction for a two-phase commit.
+     * This transitions the transaction to the PREPARING_TRANSACTION state.
+     */
+    public synchronized void beginPrepare() {
+        ensureTransactional();
+        throwIfPendingState("prepareTransaction");
+        maybeFailWithError();
+        transitionTo(State.PREPARING_TRANSACTION);

Review Comment:
   We should also transition into prepared state, if `keepPreparedTxn` was 
`true` and the transaction is ongoing on the TC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to