jolshan commented on code in PR #19751: URL: https://github.com/apache/kafka/pull/19751#discussion_r2114887634
########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -772,135 +793,159 @@ class TransactionCoordinator(txnConfig: TransactionConfig, val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch txnMetadata.inLock { - producerIdCopy = txnMetadata.producerId - producerEpochCopy = txnMetadata.producerEpoch - // PrepareEpochFence has slightly different epoch bumping logic so don't include it here. - // Note that, it can only happen when the current state is Ongoing. - isEpochFence = txnMetadata.pendingState.contains(TransactionState.PREPARE_EPOCH_FENCE) - // True if the client retried a request that had overflowed the epoch, and a new producer ID is stored in the txnMetadata - val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId == producerId && - producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch == 0 - // True if the client retried an endTxn request, and the bumped producer epoch is stored in the txnMetadata. - val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch == producerEpoch + 1 - - val isValidEpoch = { - if (!isEpochFence) { - // With transactions V2, state + same epoch is not sufficient to determine if a retry transition is valid. If the epoch is the - // same it actually indicates the next endTransaction call. Instead, we want to check the epoch matches with the epoch in the retry conditions. - // Return producer fenced even in the cases where the epoch is higher and could indicate an invalid state transition. - // Use the following criteria to determine if a v2 retry is valid: - txnMetadata.state match { - case TransactionState.ONGOING | TransactionState.EMPTY | TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE => - producerEpoch == txnMetadata.producerEpoch - case TransactionState.PREPARE_COMMIT | TransactionState.PREPARE_ABORT => - retryOnEpochBump - case TransactionState.COMPLETE_COMMIT | TransactionState.COMPLETE_ABORT => - retryOnEpochBump || retryOnOverflow || producerEpoch == txnMetadata.producerEpoch + // These checks are performed first so that we don't have to deal with intermediate states + // (including PrepareCommit and PrepareAbort) in already complex state machine. + if (txnMetadata.pendingTransitionInProgress && !txnMetadata.pendingState.contains(TransactionState.PREPARE_EPOCH_FENCE)) { + Left(Errors.CONCURRENT_TRANSACTIONS) + } else if (txnMetadata.state == TransactionState.PREPARE_COMMIT || txnMetadata.state == TransactionState.PREPARE_ABORT) { + Left(Errors.CONCURRENT_TRANSACTIONS) + } else { + // Copy the data under lock, this is going to be used if we want to return success + // on retrying commit / abort. + producerIdCopy = txnMetadata.producerId + producerEpochCopy = txnMetadata.producerEpoch + + // PrepareEpochFence has slightly different epoch bumping logic so don't include it here. + // Note that, it can only happen when the current state is Ongoing. + isEpochFence = txnMetadata.pendingState.contains(TransactionState.PREPARE_EPOCH_FENCE) + + // Calculate retry conditions, note that they are going to be used only + // in CompleteCommit / CompleteAbort states, so we can use the semantics + // of the producerId / epoch that's applicable in those states. + // True if the client retried a request that had overflowed the epoch, and a new producer ID is stored in the txnMetadata + val retryOnOverflow = !isEpochFence && txnMetadata.prevProducerId == producerId && + producerEpoch == Short.MaxValue - 1 && txnMetadata.producerEpoch == 0 + // True if the client retried an endTxn request, and the bumped producer epoch is stored in the txnMetadata. + val retryOnEpochBump = !isEpochFence && txnMetadata.producerEpoch == producerEpoch + 1 + + val isValidEpoch = { + if (!isEpochFence) { + // With transactions V2, state + same epoch is not sufficient to determine if a retry transition is valid. If the epoch is the + // same it actually indicates the next endTransaction call. Instead, we want to check the epoch matches with the epoch in the retry conditions. + // Return producer fenced even in the cases where the epoch is higher and could indicate an invalid state transition. + // Use the following criteria to determine if a v2 retry is valid: + txnMetadata.state match { + case TransactionState.ONGOING | TransactionState.EMPTY | TransactionState.DEAD | TransactionState.PREPARE_EPOCH_FENCE => + producerEpoch == txnMetadata.clientProducerEpoch + case TransactionState.PREPARE_COMMIT | TransactionState.PREPARE_ABORT => + false // must not happen, as we already checked and returned concurrent transactions + case TransactionState.COMPLETE_COMMIT | TransactionState.COMPLETE_ABORT => + retryOnEpochBump || retryOnOverflow || producerEpoch == txnMetadata.producerEpoch + } + } else { + // If the epoch is going to be fenced, it bumps the epoch differently with TV2. + (!isFromClient || producerEpoch == txnMetadata.clientProducerEpoch) && producerEpoch >= txnMetadata.clientProducerEpoch } - } else { - // If the epoch is going to be fenced, it bumps the epoch differently with TV2. - (!isFromClient || producerEpoch == txnMetadata.producerEpoch) && producerEpoch >= txnMetadata.producerEpoch } - } - - val isRetry = retryOnEpochBump || retryOnOverflow - def generateTxnTransitMetadataForTxnCompletion(nextState: TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, TxnTransitMetadata)] = { - // Maybe allocate new producer ID if we are bumping epoch and epoch is exhausted - val nextProducerIdOrErrors = - if (!isEpochFence && txnMetadata.isProducerEpochExhausted) { - try { - Right(producerIdManager.generateProducerId()) - } catch { - case e: Exception => Left(Errors.forException(e)) + val isRetry = retryOnEpochBump || retryOnOverflow + + def generateTxnTransitMetadataForTxnCompletion(nextState: TransactionState, noPartitionAdded: Boolean): ApiResult[(Int, TxnTransitMetadata)] = { + // Maybe allocate new producer ID if the epoch is exhausted + // Note that we can arrive at this scenario when a client did a few keep-prepared calls and then + // decided to do one without keep-prepared (e.g. to force-terminate), so if we have + // nextProducerEpoch, then we need to do the epoch bump even for the isEpochFence case. + val nextProducerIdOrErrors = + if ((!isEpochFence || txnMetadata.hasNextProducerEpoch) && txnMetadata.isProducerEpochExhausted) { + try { + Right(producerIdManager.generateProducerId()) + } catch { + case e: Exception => Left(Errors.forException(e)) + } + } else { + Right(txnMetadata.nextProducerId) } + + // If the next producer epoch is set (which can happen if we used InitProducerId(keepPreparedTxn)) + // then we also need to bump this epoch -- we want to fence requests that were issued with that + // epoch after the transaction is complete. Consider this example: + // 1. (initial state): Ongoing, producerId = 42, producerEpoch = 100, nextProducerId = -1, nextProducerEpoch = -1 + // 2. InitProducerId(keepPreparedTxn): Ongoing, producerId = 42, producerEpoch = 100, nextProducerId = 42, nextProducerEpoch = 101 Review Comment: At step 2, are we returning the epoch 100 or 101 to the client? In other words, what is the epoch we send at step 3? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org