hachikuji commented on a change in pull request #8239:
URL: https://github.com/apache/kafka/pull/8239#discussion_r453899181



##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -501,6 +502,21 @@ class TransactionCoordinator(brokerId: Int,
               info(s"Aborting sending of transaction markers and returning 
$error error to client for $transactionalId's EndTransaction request of 
$txnMarkerResult, " +
                 s"since appending $newMetadata to transaction log with 
coordinator epoch $coordinatorEpoch failed")
 
+              if (isEpochFence) {
+                txnManager.getTransactionState(transactionalId).foreach {
+                  case None =>
+                    warn(s"The coordinator still owns the transaction 
partition for $transactionalId, but there is " +
+                      s"no metadata in the cache; this is not expected")
+
+                  case Some(epochAndMetadata) =>
+                    if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) 
{
+                      // This was attempted epoch fence that failed, so mark 
this state on the metadata
+                      epochAndMetadata.transactionMetadata.hasFailedEpochFence 
= true
+                      warn("")

Review comment:
       Looks like this was forgotten.

##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
##########
@@ -210,7 +214,9 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
     if (producerEpoch == Short.MaxValue)
       throw new IllegalStateException(s"Cannot fence producer with epoch equal 
to Short.MaxValue since this would overflow")
 
-    prepareTransitionTo(PrepareEpochFence, producerId, (producerEpoch + 
1).toShort, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs,
+    val bumpedEpoch = if (hasFailedEpochFence) producerEpoch else 
(producerEpoch + 1).toShort

Review comment:
       I believe it's accurate to say that if `hasFailedEpochFence` is set, 
then the bumped epoch could not have been returned to the client. Is that 
right? It might be worth a comment emphasizing that.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
##########
@@ -564,7 +564,7 @@ class TransactionCoordinatorTest {
       .anyTimes()
 
     val originalMetadata = new TransactionMetadata(transactionalId, 
producerId, producerId, (producerEpoch + 1).toShort,
-      producerEpoch, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), 
time.milliseconds())
+      (producerEpoch - 1).toShort, txnTimeoutMs, Ongoing, partitions, 
time.milliseconds(), time.milliseconds())

Review comment:
       Hmm was this wrong? It seems weird to have last producer epoch set to a 
value which is 2 less than the producer epoch.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
##########
@@ -614,6 +614,83 @@ class TransactionCoordinatorTest {
     EasyMock.verify(transactionManager)
   }
 
+  @Test
+  def 
shouldNotRepeatedlyBumpEpochDueToInitPidDuringOngoingTxnIfAppendToLogFails(): 
Unit = {
+    val txnMetadata = new TransactionMetadata(transactionalId, producerId, 
producerId, producerEpoch,
+      RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions, 
time.milliseconds(), time.milliseconds())
+
+    
EasyMock.expect(transactionManager.validateTransactionTimeoutMs(EasyMock.anyInt()))
+      .andReturn(true)
+      .anyTimes()
+
+    
EasyMock.expect(transactionManager.putTransactionStateIfNotExists(EasyMock.anyObject[TransactionMetadata]()))
+      .andReturn(Right(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata)))
+      .anyTimes()
+
+    
EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId)))
+      .andAnswer(() => 
Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
+      .anyTimes()
+
+    /* val txnMetadataAfterAppendFailure = new 
TransactionMetadata(transactionalId, producerId, producerId, (producerEpoch + 
1).toShort,

Review comment:
       nit: did we not need this?

##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -394,8 +395,8 @@ class TransactionCoordinator(brokerId: Int,
                 if (nextState == PrepareAbort && 
txnMetadata.pendingState.contains(PrepareEpochFence)) {
                   // We should clear the pending state to make way for the 
transition to PrepareAbort and also bump
                   // the epoch in the transaction metadata we are about to 
append.
+                  isEpochFence = true
                   txnMetadata.pendingState = None
-                  txnMetadata.lastProducerEpoch = txnMetadata.producerEpoch

Review comment:
       Can you explain why we no longer need to set this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to