artemlivshits commented on code in PR #17402:
URL: https://github.com/apache/kafka/pull/17402#discussion_r1796293859


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1568,6 +1571,17 @@ public void handleResponse(AbstractResponse response) {
             Errors error = endTxnResponse.error();
 
             if (error == Errors.NONE) {
+                // For transaction version 5+, the broker includes the 
producerId and producerEpoch in the EndTxnResponse.
+                // KIP-890 Part 2 mandates bumping the epoch after every 
transaction. If the epoch overflows,
+                // a new producerId is returned with epoch set to 0.
+                if (isTransactionV2Enabled) {

Review Comment:
   > Do we not force sending version 5? I think we may want to do that and get 
UnsupportedVersionException
   
   The desired behavior is for this^^ to happen.  If TV is downgraded in the 
middle of transaction, we don't change `isTransactionV2Enabled` until the end 
and continue using new protocol and new RPC versions.  If the broker image 
hasn't downgraded yet, then it'll just work.  If the broker has managed to 
downgrade, then we'd get UnsuppotedVersionException, so we should never get an 
older version response when `isTransactionV2Enabled == true`.  There are other 
ways to implement downgrade, but this one is simple to prove correctness (for 
example, we don't need to consider all the combinations of cases when protocol 
gets switched in the middle and we could send mixed versions to different 
brokers and prove that all combinations work correctly).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to