jsancio commented on code in PR #12032:
URL: https://github.com/apache/kafka/pull/12032#discussion_r847810681
##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2348,7 +2348,20 @@ class KafkaController(val config: KafkaConfig,
controllerContext.partitionLeadershipInfo(tp) match {
case Some(leaderIsrAndControllerEpoch) =>
val currentLeaderAndIsr =
leaderIsrAndControllerEpoch.leaderAndIsr
- if (newLeaderAndIsr.leaderRecoveryState ==
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
+ if (newLeaderAndIsr.leaderEpoch <
currentLeaderAndIsr.leaderEpoch) {
+ partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
+ None
+ } else if (newLeaderAndIsr.leaderEpoch >
currentLeaderAndIsr.leaderEpoch) {
+ partitionResponses(tp) = Left(Errors.UNKNOWN_LEADER_EPOCH)
Review Comment:
This is a new error returned by this handling. Can we
1. Make sure that `ReplicationControlManager` also returns this error
2. We handle this error in `kafka.cluster.Partition`
Because this error is new and not handle by `Partition` it will go to the
default behavior which is to retry. I think this is okay. Should we change the
code for future brokers to not retry? What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]