jolshan commented on code in PR #19751:
URL: https://github.com/apache/kafka/pull/19751#discussion_r2114887634


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -772,135 +793,159 @@ class TransactionCoordinator(txnConfig: 
TransactionConfig,
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
 
           txnMetadata.inLock {
-            producerIdCopy = txnMetadata.producerId
-            producerEpochCopy = txnMetadata.producerEpoch
-            // PrepareEpochFence has slightly different epoch bumping logic so 
don't include it here.
-            // Note that, it can only happen when the current state is Ongoing.
-            isEpochFence = 
txnMetadata.pendingState.contains(TransactionState.PREPARE_EPOCH_FENCE)
-            // True if the client retried a request that had overflowed the 
epoch, and a new producer ID is stored in the txnMetadata
-            val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId 
== producerId &&
-              producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch 
== 0
-            // True if the client retried an endTxn request, and the bumped 
producer epoch is stored in the txnMetadata.
-            val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch 
== producerEpoch + 1
-
-            val isValidEpoch = {
-              if (!isEpochFence) {
-                // With transactions V2, state + same epoch is not sufficient 
to determine if a retry transition is valid. If the epoch is the
-                // same it actually indicates the next endTransaction call. 
Instead, we want to check the epoch matches with the epoch in the retry 
conditions.
-                // Return producer fenced even in the cases where the epoch is 
higher and could indicate an invalid state transition.
-                // Use the following criteria to determine if a v2 retry is 
valid:
-                txnMetadata.state match {
-                  case TransactionState.ONGOING | TransactionState.EMPTY | 
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
-                    producerEpoch == txnMetadata.producerEpoch
-                  case TransactionState.PREPARE_COMMIT | 
TransactionState.PREPARE_ABORT =>
-                    retryOnEpochBump
-                  case TransactionState.COMPLETE_COMMIT | 
TransactionState.COMPLETE_ABORT =>
-                    retryOnEpochBump || retryOnOverflow || producerEpoch == 
txnMetadata.producerEpoch
+            // These checks are performed first so that we don't have to deal 
with intermediate states
+            // (including PrepareCommit and PrepareAbort) in already complex 
state machine.
+            if (txnMetadata.pendingTransitionInProgress && 
!txnMetadata.pendingState.contains(TransactionState.PREPARE_EPOCH_FENCE)) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (txnMetadata.state == TransactionState.PREPARE_COMMIT || 
txnMetadata.state == TransactionState.PREPARE_ABORT) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else {
+              // Copy the data under lock, this is going to be used if we want 
to return success
+              // on retrying commit / abort.
+              producerIdCopy = txnMetadata.producerId
+              producerEpochCopy = txnMetadata.producerEpoch
+
+              // PrepareEpochFence has slightly different epoch bumping logic 
so don't include it here.
+              // Note that, it can only happen when the current state is 
Ongoing.
+              isEpochFence = 
txnMetadata.pendingState.contains(TransactionState.PREPARE_EPOCH_FENCE)
+
+              // Calculate retry conditions, note that they are going to be 
used only
+              // in CompleteCommit / CompleteAbort states, so we can use the 
semantics
+              // of the producerId / epoch that's applicable in those states.
+              // True if the client retried a request that had overflowed the 
epoch, and a new producer ID is stored in the txnMetadata
+              val retryOnOverflow = !isEpochFence && 
txnMetadata.prevProducerId == producerId &&
+                producerEpoch == Short.MaxValue - 1 && 
txnMetadata.producerEpoch == 0
+              // True if the client retried an endTxn request, and the bumped 
producer epoch is stored in the txnMetadata.
+              val retryOnEpochBump = !isEpochFence && 
txnMetadata.producerEpoch == producerEpoch + 1
+
+              val isValidEpoch = {
+                if (!isEpochFence) {
+                  // With transactions V2, state + same epoch is not 
sufficient to determine if a retry transition is valid. If the epoch is the
+                  // same it actually indicates the next endTransaction call. 
Instead, we want to check the epoch matches with the epoch in the retry 
conditions.
+                  // Return producer fenced even in the cases where the epoch 
is higher and could indicate an invalid state transition.
+                  // Use the following criteria to determine if a v2 retry is 
valid:
+                  txnMetadata.state match {
+                    case TransactionState.ONGOING | TransactionState.EMPTY | 
TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE =>
+                      producerEpoch == txnMetadata.clientProducerEpoch
+                    case TransactionState.PREPARE_COMMIT | 
TransactionState.PREPARE_ABORT =>
+                      false  // must not happen, as we already checked and 
returned concurrent transactions
+                    case TransactionState.COMPLETE_COMMIT | 
TransactionState.COMPLETE_ABORT =>
+                      retryOnEpochBump || retryOnOverflow || producerEpoch == 
txnMetadata.producerEpoch
+                  }
+                } else {
+                  // If the epoch is going to be fenced, it bumps the epoch 
differently with TV2.
+                  (!isFromClient || producerEpoch == 
txnMetadata.clientProducerEpoch) && producerEpoch >= 
txnMetadata.clientProducerEpoch
                 }
-              } else {
-                // If the epoch is going to be fenced, it bumps the epoch 
differently with TV2.
-                (!isFromClient || producerEpoch == txnMetadata.producerEpoch) 
&& producerEpoch >= txnMetadata.producerEpoch
               }
-            }
-
-            val isRetry = retryOnEpochBump || retryOnOverflow
 
-            def generateTxnTransitMetadataForTxnCompletion(nextState: 
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, 
TxnTransitMetadata)] = {
-              // Maybe allocate new producer ID if we are bumping epoch and 
epoch is exhausted
-              val nextProducerIdOrErrors =
-                if (!isEpochFence && txnMetadata.isProducerEpochExhausted) {
-                  try {
-                    Right(producerIdManager.generateProducerId())
-                  } catch {
-                    case e: Exception => Left(Errors.forException(e))
+              val isRetry = retryOnEpochBump || retryOnOverflow
+
+              def generateTxnTransitMetadataForTxnCompletion(nextState: 
TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, 
TxnTransitMetadata)] = {
+                // Maybe allocate new producer ID if the epoch is exhausted
+                // Note that we can arrive at this scenario when a client did 
a few keep-prepared calls and then
+                // decided to do one without keep-prepared (e.g. to 
force-terminate), so if we have
+                // nextProducerEpoch, then we need to do the epoch bump even 
for the isEpochFence case.
+                val nextProducerIdOrErrors =
+                  if ((!isEpochFence || txnMetadata.hasNextProducerEpoch) && 
txnMetadata.isProducerEpochExhausted) {
+                    try {
+                      Right(producerIdManager.generateProducerId())
+                    } catch {
+                      case e: Exception => Left(Errors.forException(e))
+                    }
+                  } else {
+                    Right(txnMetadata.nextProducerId)
                   }
+
+                // If the next producer epoch is set (which can happen if we 
used InitProducerId(keepPreparedTxn))
+                // then we also need to bump this epoch -- we want to fence 
requests that were issued with that
+                // epoch after the transaction is complete.  Consider this 
example:
+                //  1. (initial state): Ongoing, producerId = 42, 
producerEpoch = 100, nextProducerId = -1, nextProducerEpoch = -1
+                //  2. InitProducerId(keepPreparedTxn): Ongoing, producerId = 
42, producerEpoch = 100, nextProducerId = 42, nextProducerEpoch = 101

Review Comment:
   At step 2, are we returning the epoch 100 or 101 to the client? In other 
words, what is the epoch we send at step 3?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to