hachikuji commented on a change in pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#discussion_r803180976



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -110,6 +110,14 @@
     LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
     ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);
 
+    /**
+     * ALTER_ISR was the old name for ALTER_PARTITION.
+     *
+     * @deprecated since 3.2.0. Use {@link #ALTER_PARTITION} instead
+     */
+    @Deprecated

Review comment:
       Is this necessary? I don't believe this is a public API.

##########
File path: checkstyle/suppressions.xml
##########
@@ -276,6 +276,8 @@
               
files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
               
files="(QuorumController|ReplicationControlManager|ReplicationControlManagerTest).java"/>
+    <suppress checks="MethodLength"

Review comment:
       Probably a sign that `alterPartition` is getting out of control. Could 
we consider adding some helpers instead?

##########
File path: clients/src/main/resources/common/message/AlterPartitionRequest.json
##########
@@ -34,9 +34,11 @@
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The leader epoch of this partition" },
         { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType": 
"brokerId",
-          "about": "The ISR for this partition"},
-        { "name": "CurrentIsrVersion", "type": "int32", "versions": "0+",
-          "about": "The expected version of ISR which is being updated"}
+          "about": "The ISR for this partition" },
+        { "name": "LeaderRecoveryState", "type": "int8", "versions": "1+", 
"default": "0",
+          "about": "1 if the partition is recovering from an unclean leader 
election; 0 otherwise." },
+        { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
+          "about": "The expected epoch of the partition which is being 
updated" }

Review comment:
       It might be useful to mention that this refers to the zk version for zk 
clusters? Same question for response.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -147,56 +147,65 @@ sealed trait IsrState {
   def maximalIsr: Set[Int]
 
   /**
-   * Indicates if we have an AlterIsr request inflight.
+   * The leader recovery state. See the description for LeaderRecoveryState 
for details on the different values.
+   */
+  def leaderRecoveryState: LeaderRecoveryState
+
+  /**
+   * Indicates if we have an AlterPartition request inflight.
    */
   def isInflight: Boolean
 }
 
-sealed trait PendingIsrChange extends IsrState {
+sealed trait PendingPartitionChange extends PartitionState {
   def sentLeaderAndIsr: LeaderAndIsr
+
+  override val leaderRecoveryState: LeaderRecoveryState = 
LeaderRecoveryState.RECOVERED

Review comment:
       I can't say I like the idea of getting this implicitly. I do get the 
point that we would not send `AlterPartition` with anything except 
`LeaderRecoveryState.RECOVERED` (for now), but I think we would tend to 
overlook the implication when it is hidden here. Can we at least include it in 
the `toString` implementations?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1305,33 +1305,33 @@ private DescribeUserScramCredentialsResponse 
createDescribeUserScramCredentialsR
         return new DescribeUserScramCredentialsResponse(data);
     }
 
-    private AlterIsrRequest createAlterIsrRequest(short version) {
-        AlterIsrRequestData data = new AlterIsrRequestData()
+    private AlterPartitionRequest createAlterPartitionRequest(short version) {

Review comment:
       Shall we do any tests for the new fields?

##########
File path: core/src/main/scala/kafka/api/LeaderAndIsr.scala
##########
@@ -52,11 +63,12 @@ case class LeaderAndIsr(leader: Int,
     } else if (other == null) {
       false
     } else {
-      leader == other.leader && leaderEpoch == other.leaderEpoch && 
isr.equals(other.isr)
+      leader == other.leader && leaderEpoch == other.leaderEpoch && 
isr.equals(other.isr) &&
+      leaderRecoveryState == other.leaderRecoveryState

Review comment:
       nit: indent this?

##########
File path: core/src/main/scala/kafka/controller/Election.scala
##########
@@ -40,7 +40,14 @@ object Election {
         val newLeaderAndIsrOpt = leaderOpt.map { leader =>
           val newIsr = if (isr.contains(leader)) isr.filter(replica => 
controllerContext.isReplicaOnline(replica, partition))
           else List(leader)
-          leaderAndIsr.newLeaderAndIsr(leader, newIsr)
+
+          if (!isr.contains(leader)) {
+            // The new leader is not in the old ISR so mark the partition a 
RECOVERING
+            leaderAndIsr.newRecoveringLeaderAndIsr(leader, newIsr)
+          } else {
+            // Elect a new leader but keep the previous leader recovery state

Review comment:
       Is it possible for the state to be RECOVERING here? 

##########
File path: core/src/main/scala/kafka/controller/Election.scala
##########
@@ -53,17 +60,17 @@ object Election {
    * Elect leaders for new or offline partitions.
    *
    * @param controllerContext Context with the current state of the cluster
-   * @param partitionsWithUncleanLeaderElectionState A sequence of tuples 
representing the partitions
+   * @param partitionsWithUncleanLeaderLeaderRecoveryState A sequence of 
tuples representing the partitions

Review comment:
       nit: the original name seems fine for what it returns. "LeaderLeader"?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -542,7 +551,8 @@ class Partition(val topicPartition: TopicPartition,
         assignment = partitionState.replicas.asScala.map(_.toInt),
         isr = isr,
         addingReplicas = addingReplicas,
-        removingReplicas = removingReplicas
+        removingReplicas = removingReplicas,
+        LeaderRecoveryState.RECOVERED

Review comment:
       Can we at least log a message if the LeaderAndIsr indicates recovery is 
needed? I think with this implementation, if there is only one replica, the 
state would remain RECOVERING indefinitely. Do I have that right?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2263,37 +2271,43 @@ class KafkaController(val config: KafkaConfig,
             .groupBy { case (tp, _) => tp.topic }   // Group by topic
             .foreach { case (topic, partitions) =>
               // Add each topic part to the response
-              val topicResp = new AlterIsrResponseData.TopicData()
+              val topicResp = new AlterPartitionResponseData.TopicData()
                 .setName(topic)
                 .setPartitions(new util.ArrayList())
               resp.topics.add(topicResp)
               partitions.foreach { case (tp, errorOrIsr) =>
                 // Add each partition part to the response (new ISR or error)
                 errorOrIsr match {
                   case Left(error) => topicResp.partitions.add(
-                    new AlterIsrResponseData.PartitionData()
+                    new AlterPartitionResponseData.PartitionData()
                       .setPartitionIndex(tp.partition)
                       .setErrorCode(error.code))
                   case Right(leaderAndIsr) => topicResp.partitions.add(
-                    new AlterIsrResponseData.PartitionData()
+                    new AlterPartitionResponseData.PartitionData()
                       .setPartitionIndex(tp.partition)
                       .setLeaderId(leaderAndIsr.leader)
                       .setLeaderEpoch(leaderAndIsr.leaderEpoch)
                       .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                      .setCurrentIsrVersion(leaderAndIsr.zkVersion))
+                      
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)

Review comment:
       Do we need to make this field ignorable in the response?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2781,8 +2812,9 @@ case object IsrChangeNotification extends ControllerEvent 
{
   override def preempt(): Unit = {}
 }
 
-case class AlterIsrReceived(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
-                            callback: AlterIsrCallback) extends 
ControllerEvent {
+case class AlterPartitionReceived(
+  brokerId: Int, brokerEpoch: Long, partitionssToAlter: Map[TopicPartition, 
LeaderAndIsr], callback: AlterPartitionCallback

Review comment:
       nit: extra 's' in "partitionss"

##########
File path: core/src/test/scala/kafka/api/LeaderAndIsrTest.scala
##########
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import org.apache.kafka.metadata.LeaderRecoveryState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+final class LeaderAndIsrTest {
+  @Test
+  def testRecoveringLeaderAndIsr(): Unit = {
+    val leaderAndIsr = LeaderAndIsr(1, List(1, 2))
+    val recoveringLeaderAndIsr = leaderAndIsr.newRecoveringLeaderAndIsr(3, 
List(3))
+

Review comment:
       nit: extra newline

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -250,23 +260,35 @@ class DefaultAlterIsrManager(
         val partitionResponses: mutable.Map[TopicPartition, Either[Errors, 
LeaderAndIsr]] =
           new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
         data.topics.forEach { topic =>
-          topic.partitions().forEach(partition => {
+          topic.partitions().forEach { partition =>
             val tp = new TopicPartition(topic.name, partition.partitionIndex)
-            val error = Errors.forCode(partition.errorCode())
+            val apiError = Errors.forCode(partition.errorCode())
             debug(s"Controller successfully handled AlterIsr request for $tp: 
$partition")
-            if (error == Errors.NONE) {
-              val newLeaderAndIsr = new LeaderAndIsr(partition.leaderId, 
partition.leaderEpoch,
-                partition.isr.asScala.toList.map(_.toInt), 
partition.currentIsrVersion)
-              partitionResponses(tp) = Right(newLeaderAndIsr)
+            if (apiError == Errors.NONE) {
+              try {
+                partitionResponses(tp) = Right(
+                  LeaderAndIsr(
+                    partition.leaderId,
+                    partition.leaderEpoch,
+                    partition.isr.asScala.toList.map(_.toInt),
+                    LeaderRecoveryState.of(partition.leaderRecoveryState),
+                    partition.partitionEpoch
+                  )
+                )
+              } catch {
+                case e: IllegalArgumentException =>

Review comment:
       It might be cleaner to let `LeaderRecoveryState.of` return 
`Optional<LeaderRecoveryState>`

##########
File path: 
metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public enum LeaderRecoveryState {
+    /**
+     * Represent that the election for the partition was either an ISR 
election or the
+     * leader recovered from an unclean leader election.
+     */
+    RECOVERED((byte) 0),
+
+    /**
+     * Represent that the election for the partition was an unclean leader 
election and
+     * that the leader is recovering from it.
+     */
+    RECOVERING((byte) 1);
+
+    /**
+     * A special value used to represent that the LeaderRecoveryState field of 
a
+     * PartitionChangeRecord didn't change.
+     */
+    private static final byte NO_CHANGE = (byte) -1;
+
+    private static final Map<Byte, LeaderRecoveryState> VALUE_TO_ENUM;

Review comment:
       nit: the map seems like overkill with two values

##########
File path: core/src/test/scala/kafka/zk/TopicPartitionStateZNodeTest.scala
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import TopicPartitionStateZNode.decode
+import TopicPartitionStateZNode.encode
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.zookeeper.data.Stat
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.when
+
+final class TopicPartitionStateZNodeTest {
+
+  @Test
+  def testEncodeDecode(): Unit = {

Review comment:
       Maybe check the default case as well when leader recovery state is not 
defined?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1080,6 +1092,9 @@ class Partition(val topicPartition: TopicPartition,
     // decide whether to only fetch from leader
     val localLog = localLogWithEpochOrException(currentLeaderEpoch, 
fetchOnlyFromLeader)
 
+    // Check that the partition leader is recovering from an unclean leader 
election.
+    validateLeaderRecoveryState()

Review comment:
       Are we doing this on writes as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to