[ 
https://issues.apache.org/jira/browse/KAFKA-19999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18045637#comment-18045637
 ] 

Justine Olshan commented on KAFKA-19999:
----------------------------------------

> In Transaction V2, brokers enforce strict epoch monotonicity for control 
> batches (markers), requiring marker_epoch > current_epoch. Consequently, the 
> broker rejects the recovery marker with InvalidProducerEpochException.

This is not true for 4.0 and 4.1 right? This change was introduced in 4.2? 

> Transaction coordinator livelock caused by invalid producer epoch
> -----------------------------------------------------------------
>
>                 Key: KAFKA-19999
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19999
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Chia-Ping Tsai
>            Assignee: Chia-Ping Tsai
>            Priority: Blocker
>             Fix For: 4.2.0, 4.0.2, 4.1.2
>
>
> *case 1: during recovery*
> When a Transaction Coordinator fails over and reloads transactions in 
> PREPARE_COMMIT or PREPARE_ABORT state from the transaction log, it currently 
> reuses the logged producer epoch to send the transaction markers.
> In Transaction V2, brokers enforce strict epoch monotonicity for control 
> batches (markers), requiring marker_epoch > current_epoch. Consequently, the 
> broker rejects the recovery marker with InvalidProducerEpochException.
> {code:java}
>     private void checkProducerEpoch(short producerEpoch, long offset, short 
> transactionVersion) {
>         short current = updatedEntry.producerEpoch();
>         boolean invalidEpoch = (transactionVersion >= 2) ? (producerEpoch <= 
> current) : (producerEpoch < current);
>         if (invalidEpoch) {
>             String comparison = (transactionVersion >= 2) ? "<=" : "<";
>             String message = "Epoch of producer " + producerId + " at offset 
> " + offset + " in " + topicPartition +
>                     " is " + producerEpoch + ", which is " + comparison + " 
> the last seen epoch " + current +
>                     " (TV" + transactionVersion + ")";
>             if (origin == AppendOrigin.REPLICATION) {
>                 log.warn(message);
>             } else {
>                 // Starting from 2.7, we replaced ProducerFenced error with 
> InvalidProducerEpoch in the
>                 // producer send response callback to differentiate from the 
> former fatal exception,
>                 // letting client abort the ongoing transaction and retry.
>                 throw new InvalidProducerEpochException(message);
>             }
>         }
>     }
> {code}
> The coordinator handles this error by removing the pending transaction from 
> memory without writing a COMPLETE state to the transaction log. This leaves 
> the transaction permanently hanging in the PREPARE state. Clients attempting 
> to continue or commit offsets subsequently fail with CONCURRENT_TRANSACTIONS 
> in an infinite retry loop.
> {code:scala}
>                     case Errors.INVALID_PRODUCER_EPOCH |
>                          Errors.TRANSACTION_COORDINATOR_FENCED => // producer 
> or coordinator epoch has changed, this txn can now be ignored
>                       info(s"Sending $transactionalId's transaction marker 
> for partition $topicPartition has permanently failed with error 
> ${error.exceptionName} " +
>                         s"with the current coordinator epoch 
> ${epochAndMetadata.coordinatorEpoch}; cancel sending any more transaction 
> markers $txnMarker to the brokers")
>                       
> txnMarkerChannelManager.removeMarkersForTxn(pendingCompleteTxn)
>                       abortSending = true
> {code}
> *case 2: due to disconnection*
> If a WriteTxnMarkersRequest is persisted by a partition leader but the 
> response is lost (e.g., due to disconnection), the Transaction Coordinator 
> (TC) retries the request using the same producer epoch.
> If the target broker restarts and restores its state before receiving the 
> retry, it rejects the request with InvalidProducerEpochException.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to