artemlivshits commented on code in PR #17698: URL: https://github.com/apache/kafka/pull/17698#discussion_r1841271245
########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -610,7 +610,17 @@ class TransactionCoordinator(txnConfig: TransactionConfig, else logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case Empty => - logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) + if (clientTransactionVersion.supportsEpochBump() && !txnMetadata.pendingState.contains(PrepareEpochFence)) { + // If the client and server both use transaction V2, the client is allowed to commit/abort Review Comment: We don't have to allow commits on top of empty state, only aborts. For commits there are only 2 cases: 1. Client didn't send any data to partitions / offsets -- it knows for sure that commit doesn't need to be issued. 2. Client successfully sent all data to partitions / offsets -- it knows for sure that the transaction is ongoing. For aborts there are 3 cases: 1. Client didn't send any data to partitions / offsets -- it knows for sure that abort doesn't need to be issued. 2. Client successfully sent data to some partitions / offsets -- it knows for sure that the transaction is ongoing. 3. Client tried to send data, but it's not successful -- it needs to send abort on top of a potentially empty state. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -610,7 +610,17 @@ class TransactionCoordinator(txnConfig: TransactionConfig, else logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case Empty => - logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) + if (clientTransactionVersion.supportsEpochBump() && !txnMetadata.pendingState.contains(PrepareEpochFence)) { + // If the client and server both use transaction V2, the client is allowed to commit/abort + // transactions when the transaction state is Empty because the client can't be sure about the + // current transaction state. + // Note that, we should not use txnMetadata info to check if the client is using V2 because the + // only request received by server so far is InitProducerId request which does not tell whether + // the client uses V2. + Left(Errors.NONE) Review Comment: We cannot do this. We need to go through the actual abort sequence and bump the epoch (as if the transaction was ongoing). Otherwise we can have: 1. Produce to partition foo-1 with epoch 42. 2. Timeout. 3. Abort, leaving the same epoch 42. 4. Produce to partition foo-1 succeeds, adds partition foo-1 to transaction. 5. Next transaction produces to partition bar-0. 6. Commits data that should've been aborted in step 3. ########## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ########## @@ -39,13 +39,33 @@ object AddPartitionsToTxnManager { val VerificationFailureRateMetricName = "VerificationFailureRate" val VerificationTimeMsMetricName = "VerificationTimeMs" + + def produceRequestVersionToTransactionSupportedOperation(version: Short): TransactionSupportedOperation = { + if (version > 11) { + addPartition + } else if (version > 10) { + genericError Review Comment: This looks like we're returning an error. Can we change the names, maybe `genericErrorSupported` and etc.? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -629,10 +629,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Return true if the given producer ID has a transaction ongoing. + * Note, if the incoming producer epoch is newer than the stored one, the transaction may have finished. */ - def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized { + def hasOngoingTransaction(producerId: Long, producerEpoch: Short): Boolean = lock synchronized { val entry = producerStateManager.activeProducers.get(producerId) - entry != null && entry.currentTxnFirstOffset.isPresent + entry != null && entry.currentTxnFirstOffset.isPresent && entry.producerEpoch() >= producerEpoch Review Comment: Should it be just `entry.producerEpoch() == producerEpoch`? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -898,6 +898,8 @@ class ReplicaManager(val config: KafkaConfig, Errors.COORDINATOR_NOT_AVAILABLE | Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException( s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")) + case Errors.UNKNOWN_PRODUCER_ID => Some(new OutOfOrderSequenceException( Review Comment: Do we actually return `UNKNOWN_PRODUCER_ID`? ########## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ########## @@ -610,7 +610,17 @@ class TransactionCoordinator(txnConfig: TransactionConfig, else logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case Empty => - logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) + if (clientTransactionVersion.supportsEpochBump() && !txnMetadata.pendingState.contains(PrepareEpochFence)) { Review Comment: We should also support aborts on top of CompleteAbort and CompleteCommit. The epoch provides an indication if the incoming abort is a retry (then we just need to return previous result) or a new empty abort (in this case we need to go through the full abort sequence with epoch bumping and etc.). ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -904,16 +904,22 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo } String transactionalId = null; + boolean canUseTransactionV2AboveVersion = true; Review Comment: Should we just name it `useTransactionV1Version` with the default value `false` and set it to `true` if we need to use non-latest version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org