hachikuji commented on a change in pull request #8239: URL: https://github.com/apache/kafka/pull/8239#discussion_r453899181
########## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ########## @@ -501,6 +502,21 @@ class TransactionCoordinator(brokerId: Int, info(s"Aborting sending of transaction markers and returning $error error to client for $transactionalId's EndTransaction request of $txnMarkerResult, " + s"since appending $newMetadata to transaction log with coordinator epoch $coordinatorEpoch failed") + if (isEpochFence) { + txnManager.getTransactionState(transactionalId).foreach { + case None => + warn(s"The coordinator still owns the transaction partition for $transactionalId, but there is " + + s"no metadata in the cache; this is not expected") + + case Some(epochAndMetadata) => + if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) { + // This was attempted epoch fence that failed, so mark this state on the metadata + epochAndMetadata.transactionMetadata.hasFailedEpochFence = true + warn("") Review comment: Looks like this was forgotten. ########## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ########## @@ -210,7 +214,9 @@ private[transaction] class TransactionMetadata(val transactionalId: String, if (producerEpoch == Short.MaxValue) throw new IllegalStateException(s"Cannot fence producer with epoch equal to Short.MaxValue since this would overflow") - prepareTransitionTo(PrepareEpochFence, producerId, (producerEpoch + 1).toShort, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, + val bumpedEpoch = if (hasFailedEpochFence) producerEpoch else (producerEpoch + 1).toShort Review comment: I believe it's accurate to say that if `hasFailedEpochFence` is set, then the bumped epoch could not have been returned to the client. Is that right? It might be worth a comment emphasizing that. ########## File path: core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala ########## @@ -564,7 +564,7 @@ class TransactionCoordinatorTest { .anyTimes() val originalMetadata = new TransactionMetadata(transactionalId, producerId, producerId, (producerEpoch + 1).toShort, - producerEpoch, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds()) + (producerEpoch - 1).toShort, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds()) Review comment: Hmm was this wrong? It seems weird to have last producer epoch set to a value which is 2 less than the producer epoch. ########## File path: core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala ########## @@ -614,6 +614,83 @@ class TransactionCoordinatorTest { EasyMock.verify(transactionManager) } + @Test + def shouldNotRepeatedlyBumpEpochDueToInitPidDuringOngoingTxnIfAppendToLogFails(): Unit = { + val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, producerEpoch, + RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds()) + + EasyMock.expect(transactionManager.validateTransactionTimeoutMs(EasyMock.anyInt())) + .andReturn(true) + .anyTimes() + + EasyMock.expect(transactionManager.putTransactionStateIfNotExists(EasyMock.anyObject[TransactionMetadata]())) + .andReturn(Right(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))) + .anyTimes() + + EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId))) + .andAnswer(() => Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))) + .anyTimes() + + /* val txnMetadataAfterAppendFailure = new TransactionMetadata(transactionalId, producerId, producerId, (producerEpoch + 1).toShort, Review comment: nit: did we not need this? ########## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ########## @@ -394,8 +395,8 @@ class TransactionCoordinator(brokerId: Int, if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) { // We should clear the pending state to make way for the transition to PrepareAbort and also bump // the epoch in the transaction metadata we are about to append. + isEpochFence = true txnMetadata.pendingState = None - txnMetadata.lastProducerEpoch = txnMetadata.producerEpoch Review comment: Can you explain why we no longer need to set this? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org