[ 
https://issues.apache.org/jira/browse/KAFKA-20488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18078280#comment-18078280
 ] 

David Arthur commented on KAFKA-20488:
--------------------------------------

Sounds like this can only happen if the KRaft epoch ends up higher than the ZK 
controller epoch. I think this should be quite rare in practice. Since we 
already have manual steps for the rollback (deleting some ZK state), adding an 
extra step for preventing this case seems quite reasonable.

+1 for documenting this and adding the above workaround instructions to the 
rollback instructions.

> KRaft migration rollback causes offline partitions when KRaft epoch exceeds 
> ZK controller epoch
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20488
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20488
>             Project: Kafka
>          Issue Type: Bug
>          Components: kraft, migration
>    Affects Versions: 3.9.2
>            Reporter: Michael Westerby
>            Priority: Major
>
> When reverting to ZooKeeper mode during a KRaft migration, partitions that 
> experienced state changes (ISR changes, leader elections, etc.) during the 
> dual-write phase may be left without a leader after the rollback and go 
> offline.
> This occurs because during the dual-write phase, the KRaft controller epoch 
> (a separate counter from the ZooKeeper controller epoch), is written into 
> each affected partition's state znode as the {{controller_epoch}} field.
> On rollback, a ZooKeeper broker wins the controller election and increments 
> the {{/controller_epoch}} znode by 1 to derive its own epoch. When it then 
> attempts to elect leaders for partitions, it reads back each partition state 
> znode and checks whether the stored {{controller_epoch}} exceeds its own. If 
> the KRaft controller epoch written during the dual-write phase was higher 
> than this newly derived ZooKeeper epoch, the controller treats the znode as 
> having been written by a newer controller, aborts the leader election for 
> that partition, and the partition goes offline.
> h3. Example:
> During the migration, our KRaft controllers had an epoch of 502 . This can be 
> seen in the {{/migration}} znode:
> {code:java}
> get /migration
> {"version":0,"kraft_metadata_offset":870200,"kraft_controller_id":902,"kraft_metadata_epoch":502,"kraft_controller_epoch":502}
> {code}
> This epoch was written as the {{controller_epoch}} value in the partition 
> state znode for several partitions, e.g.:
> {code:java}
> get /brokers/topics/__consumer_offsets/partitions/45/state
> {"controller_epoch":502,"leader":2,"version":1,"leader_epoch":56,"isr":[0,1,2]}
>  {code}
> Despite this, the value in the {{/controller_epoch}} znode remains much 
> lower, as it’s its own counter:
> {code:java}
> get /controller_epoch
> 226
> {code}
> Leading to these errors when the ZK broker was elected as the controller on 
> rollback with this lower epoch:
> {code:java}
> message:  [Controller id=0 epoch=226] Controller 0 epoch 226 failed to change 
> state for partition __consumer_offsets-45 from OfflinePartition to 
> OnlinePartition
> throwable: { [-]
>       class:  kafka.common.StateChangeFailedException
>       msg:  Aborted leader election for partition __consumer_offsets-45 since 
> the LeaderAndIsr path was already written by another controller. This 
> probably means that the current controller 0 went through a soft failure and 
> another controller was elected with epoch 502.
>       stack: [ [-]
>         
> kafka.controller.ZkPartitionStateMachine.$anonfun$doElectLeaderForPartitions$2(PartitionStateMachine.scala:389)
>         scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
>         scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
>         scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>         
> kafka.controller.ZkPartitionStateMachine.doElectLeaderForPartitions(PartitionStateMachine.scala:379)
>         
> kafka.controller.ZkPartitionStateMachine.electLeaderForPartitions(PartitionStateMachine.scala:336)
>         
> kafka.controller.ZkPartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:242)
>         
> kafka.controller.ZkPartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:162)
>         
> kafka.controller.PartitionStateMachine.triggerOnlineStateChangeForPartitions(PartitionStateMachine.scala:76)
>         
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:61)
>         
> kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:590)
>         
> kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1644)
>         kafka.controller.KafkaController.process(KafkaController.scala:2620)
>         kafka.controller.QueuedEvent.process(ControllerEventManager.scala:54)
>         
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:138)
>         
> kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:141)
>         
> kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1$adapted(ControllerEventManager.scala:141)
>         com.yammer.metrics.core.Timer.time(Timer.java:91)
>         
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:141)
>         
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
>      ]
>    }
> {code}
>  * Exceptions thrown from:
>  ** {{ReplicaStateMachine.scala}}
>  *** 
> [https://github.com/apache/kafka/blob/5e9866f43ab8e7e41ef39e5584ac50019381328d/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala#L412]
>  ** {{KafkaController.scala}}
>  *** 
> [https://github.com/apache/kafka/blob/5e9866f43ab8e7e41ef39e5584ac50019381328d/core/src/main/scala/kafka/controller/KafkaController.scala#L1262]
>  * KRaft epoch introduced into the partition state znode:
>  ** {{ZkTopicMigrationClient.scala}}
>  *** 
> [https://github.com/apache/kafka/blob/5e9866f43ab8e7e41ef39e5584ac50019381328d/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala#L259]
> h3. Steps to reproduce
>  # Provision a ZooKeeper-based Kafka cluster.
>  # Create a topic with a replication factor greater than 1.
>  # Set up the KRaft controller quorum but do not start the migration yet.
>  # Trigger repeated KRaft leader elections (e.g. by restarting KRaft 
> controller nodes) to build up the KRaft quorum epoch significantly than that 
> currently stored in the {{/controller_epoch}} znode.
>  # Begin the ZooKeeper to KRaft migration and proceed to the dual-write phase.
>  # Restart the current topic partitions leader broker, causing the KRaft 
> controller to write its (now high) epoch into the partition's state znode.
>  # Roll back to ZooKeeper mode following the steps documented in: 
> [https://kafka.apache.org/39/operations/kraft/#reverting-to-zookeeper-mode-during-the-migration].
>  # Confirm the topic has offline partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to