artemlivshits commented on code in PR #17698:
URL: https://github.com/apache/kafka/pull/17698#discussion_r1841271245


##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -610,7 +610,17 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                 else
                   logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
               case Empty =>
-                logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                if (clientTransactionVersion.supportsEpochBump() && 
!txnMetadata.pendingState.contains(PrepareEpochFence)) {
+                  // If the client and server both use transaction V2, the 
client is allowed to commit/abort

Review Comment:
   We don't have to allow commits on top of empty state, only aborts.  For 
commits there are only 2 cases:
   1. Client didn't send any data to partitions / offsets -- it knows for sure 
that commit doesn't need to be issued.
   2. Client successfully sent all data to partitions / offsets -- it knows for 
sure that the transaction is ongoing.
   
   For aborts there are 3 cases:
   1. Client didn't send any data to partitions / offsets -- it knows for sure 
that abort doesn't need to be issued.
   2. Client successfully sent data to some partitions / offsets -- it knows 
for sure that the transaction is ongoing.
   3. Client tried to send data, but it's not successful -- it needs to send 
abort on top of a potentially empty state.
   



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -610,7 +610,17 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                 else
                   logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
               case Empty =>
-                logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                if (clientTransactionVersion.supportsEpochBump() && 
!txnMetadata.pendingState.contains(PrepareEpochFence)) {
+                  // If the client and server both use transaction V2, the 
client is allowed to commit/abort
+                  // transactions when the transaction state is Empty because 
the client can't be sure about the
+                  // current transaction state.
+                  // Note that, we should not use txnMetadata info to check if 
the client is using V2 because the
+                  // only request received by server so far is InitProducerId 
request which does not tell whether
+                  // the client uses V2.
+                  Left(Errors.NONE)

Review Comment:
   We cannot do this.  We need to go through the actual abort sequence and bump 
the epoch (as if the transaction was ongoing).  Otherwise we can have:
   1. Produce to partition foo-1 with epoch 42.
   2. Timeout.
   3. Abort, leaving the same epoch 42.
   4. Produce to partition foo-1 succeeds, adds partition foo-1 to transaction.
   5. Next transaction produces to partition bar-0.
   6. Commits data that should've been aborted in step 3.



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -39,13 +39,33 @@ object AddPartitionsToTxnManager {
 
   val VerificationFailureRateMetricName = "VerificationFailureRate"
   val VerificationTimeMsMetricName = "VerificationTimeMs"
+
+  def produceRequestVersionToTransactionSupportedOperation(version: Short): 
TransactionSupportedOperation = {
+    if (version > 11) {
+      addPartition
+    } else if (version > 10) {
+      genericError

Review Comment:
   This  looks like we're returning an error.  Can we change the names, maybe 
`genericErrorSupported` and etc.?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -629,10 +629,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
    * Return true if the given producer ID has a transaction ongoing.
+   * Note, if the incoming producer epoch is newer than the stored one, the 
transaction may have finished.
    */
-  def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
+  def hasOngoingTransaction(producerId: Long, producerEpoch: Short): Boolean = 
lock synchronized {
     val entry = producerStateManager.activeProducers.get(producerId)
-    entry != null && entry.currentTxnFirstOffset.isPresent
+    entry != null && entry.currentTxnFirstOffset.isPresent && 
entry.producerEpoch() >= producerEpoch

Review Comment:
   Should it be just `entry.producerEpoch() == producerEpoch`?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -898,6 +898,8 @@ class ReplicaManager(val config: KafkaConfig,
                    Errors.COORDINATOR_NOT_AVAILABLE |
                    Errors.NOT_COORDINATOR => Some(new 
NotEnoughReplicasException(
                 s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
+              case Errors.UNKNOWN_PRODUCER_ID => Some(new 
OutOfOrderSequenceException(

Review Comment:
   Do we actually return `UNKNOWN_PRODUCER_ID`?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -610,7 +610,17 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                 else
                   logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
               case Empty =>
-                logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+                if (clientTransactionVersion.supportsEpochBump() && 
!txnMetadata.pendingState.contains(PrepareEpochFence)) {

Review Comment:
   We should also support aborts on top of CompleteAbort and CompleteCommit.  
The epoch provides an indication if the incoming abort is a retry (then we just 
need to return previous result) or a new empty abort (in this case we need to 
go through the full abort sequence with epoch bumping and etc.).



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -904,16 +904,22 @@ private void sendProduceRequest(long now, int 
destination, short acks, int timeo
         }
 
         String transactionalId = null;
+        boolean canUseTransactionV2AboveVersion = true;

Review Comment:
   Should we just name it `useTransactionV1Version` with the default value 
`false` and set it to `true` if we need to use non-latest version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to