artemlivshits commented on code in PR #17402: URL: https://github.com/apache/kafka/pull/17402#discussion_r1796293859
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -1568,6 +1571,17 @@ public void handleResponse(AbstractResponse response) { Errors error = endTxnResponse.error(); if (error == Errors.NONE) { + // For transaction version 5+, the broker includes the producerId and producerEpoch in the EndTxnResponse. + // KIP-890 Part 2 mandates bumping the epoch after every transaction. If the epoch overflows, + // a new producerId is returned with epoch set to 0. + if (isTransactionV2Enabled) { Review Comment: > Do we not force sending version 5? I think we may want to do that and get UnsupportedVersionException The desired behavior is for this^^ to happen. If TV is downgraded in the middle of transaction, we don't change `isTransactionV2Enabled` until the end and continue using new protocol and new RPC versions. If the broker image hasn't downgraded yet, then it'll just work. If the broker has managed to downgrade, then we'd get UnsuppotedVersionException, so we should never get an older version response when `isTransactionV2Enabled == true`. There are other ways to implement downgrade, but this one is simple to prove correctness (for example, we don't need to consider all the combinations of cases when protocol gets switched in the middle and we could send mixed versions to different brokers and prove that all combinations work correctly). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org