dajac commented on a change in pull request #11459: URL: https://github.com/apache/kafka/pull/11459#discussion_r750071143
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1401,6 +1401,12 @@ class ReplicaManager(val config: KafkaConfig, if (partitionState.leader != localBrokerId) topicIdUpdateFollowerPartitions.add(partition) Errors.NONE + case None if logTopicId.isDefined => + // If we have a topic ID in the log but not in the request, we must have previously had topic IDs but Review comment: Should we log something in this case as well? ########## File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala ########## @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.server + +import java.time.Duration +import java.util.Arrays.asList + +import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0} +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.TestUtils +import kafka.zk.ZkVersion +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.collection.{Map, Seq} + +class FetchRequestTestDowngrade extends BaseRequestTest { + + override def brokerCount: Int = 3 + override def generateConfigs: Seq[KafkaConfig] = { + // Brokers should start with newer IBP and downgrade to the older one. Review comment: I guess that it should be the controller instead of the brokers here. ########## File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala ########## @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.server + +import java.time.Duration +import java.util.Arrays.asList + +import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0} +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.TestUtils +import kafka.zk.ZkVersion +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.collection.{Map, Seq} + +class FetchRequestTestDowngrade extends BaseRequestTest { + + override def brokerCount: Int = 3 + override def generateConfigs: Seq[KafkaConfig] = { + // Brokers should start with newer IBP and downgrade to the older one. + Seq( + createConfig(0, KAFKA_3_1_IV0), + createConfig(1, KAFKA_3_1_IV0), + createConfig(2, KAFKA_2_7_IV0) Review comment: nit: Indentation seems to be off for these lines. ########## File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala ########## @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.server + +import java.time.Duration +import java.util.Arrays.asList + +import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0} +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.TestUtils +import kafka.zk.ZkVersion +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.collection.{Map, Seq} + +class FetchRequestTestDowngrade extends BaseRequestTest { + + override def brokerCount: Int = 3 + override def generateConfigs: Seq[KafkaConfig] = { + // Brokers should start with newer IBP and downgrade to the older one. + Seq( + createConfig(0, KAFKA_3_1_IV0), + createConfig(1, KAFKA_3_1_IV0), + createConfig(2, KAFKA_2_7_IV0) + ) + } + + @Test + def testTopicIdsInFetcherOldController(): Unit = { + val topic = "topic" + val producer = createProducer() + val consumer = createConsumer() + + ensureControllerIn(Seq(0)) + assertEquals(0, controllerSocketServer.config.brokerId) + val partitionLeaders = createTopic(topic, Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1))) + TestUtils.waitForAllPartitionsMetadata(servers, topic, 2) + ensureControllerIn(Seq(2)) + assertEquals(2, controllerSocketServer.config.brokerId) + + assertEquals(1, partitionLeaders(0)) + assertEquals(0, partitionLeaders(1)) + + val record1 = new ProducerRecord(topic, 0, null, "key".getBytes, "value".getBytes) + val record2 = new ProducerRecord(topic, 1, null, "key".getBytes, "value".getBytes) + producer.send(record1) + producer.send(record2) + + consumer.assign(asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1))) Review comment: nit: Could we define `TopicPartition` in the top and use them everywhere? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1401,6 +1401,12 @@ class ReplicaManager(val config: KafkaConfig, if (partitionState.leader != localBrokerId) topicIdUpdateFollowerPartitions.add(partition) Errors.NONE + case None if logTopicId.isDefined => + // If we have a topic ID in the log but not in the request, we must have previously had topic IDs but + // are now downgrading. If we are a follower, remove the topic ID from the PartitionFetchState. + if (partitionState.leader != localBrokerId) + topicIdUpdateFollowerPartitions.add(partition) Review comment: As we don't remove the topic id from the log in this case, it means that we will always hit it as long as the controller uses an older IBP and sends a LISR with the same epoch. This is definitely not a common case but I wonder if we need to do something more about it. ########## File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala ########## @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.server + +import java.time.Duration +import java.util.Arrays.asList + +import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0} +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.TestUtils +import kafka.zk.ZkVersion +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.collection.{Map, Seq} + +class FetchRequestTestDowngrade extends BaseRequestTest { + + override def brokerCount: Int = 3 + override def generateConfigs: Seq[KafkaConfig] = { + // Brokers should start with newer IBP and downgrade to the older one. + Seq( + createConfig(0, KAFKA_3_1_IV0), + createConfig(1, KAFKA_3_1_IV0), + createConfig(2, KAFKA_2_7_IV0) + ) + } + + @Test + def testTopicIdsInFetcherOldController(): Unit = { + val topic = "topic" + val producer = createProducer() + val consumer = createConsumer() + + ensureControllerIn(Seq(0)) + assertEquals(0, controllerSocketServer.config.brokerId) + val partitionLeaders = createTopic(topic, Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1))) Review comment: nit: There is an extra space before `Map`. For my understanding, why are we using two partitions here? ########## File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala ########## @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.server + +import java.time.Duration +import java.util.Arrays.asList + +import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0} +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.TestUtils +import kafka.zk.ZkVersion +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.collection.{Map, Seq} + +class FetchRequestTestDowngrade extends BaseRequestTest { + + override def brokerCount: Int = 3 + override def generateConfigs: Seq[KafkaConfig] = { + // Brokers should start with newer IBP and downgrade to the older one. + Seq( + createConfig(0, KAFKA_3_1_IV0), + createConfig(1, KAFKA_3_1_IV0), + createConfig(2, KAFKA_2_7_IV0) + ) + } + + @Test + def testTopicIdsInFetcherOldController(): Unit = { Review comment: nit: Could we find a name which better describe the case? Perhaps, `testTopicIdIsRemovedFromFetcherWhenControllerDowngrades`. ########## File path: core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala ########## @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.server + +import java.time.Duration +import java.util.Arrays.asList + +import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0} +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.TestUtils +import kafka.zk.ZkVersion +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import scala.collection.{Map, Seq} + +class FetchRequestTestDowngrade extends BaseRequestTest { + + override def brokerCount: Int = 3 + override def generateConfigs: Seq[KafkaConfig] = { + // Brokers should start with newer IBP and downgrade to the older one. + Seq( + createConfig(0, KAFKA_3_1_IV0), + createConfig(1, KAFKA_3_1_IV0), + createConfig(2, KAFKA_2_7_IV0) + ) + } + + @Test + def testTopicIdsInFetcherOldController(): Unit = { + val topic = "topic" + val producer = createProducer() + val consumer = createConsumer() + + ensureControllerIn(Seq(0)) + assertEquals(0, controllerSocketServer.config.brokerId) + val partitionLeaders = createTopic(topic, Map(0 -> Seq(1, 0, 2), 1 -> Seq(0, 2, 1))) + TestUtils.waitForAllPartitionsMetadata(servers, topic, 2) + ensureControllerIn(Seq(2)) + assertEquals(2, controllerSocketServer.config.brokerId) + + assertEquals(1, partitionLeaders(0)) + assertEquals(0, partitionLeaders(1)) + + val record1 = new ProducerRecord(topic, 0, null, "key".getBytes, "value".getBytes) + val record2 = new ProducerRecord(topic, 1, null, "key".getBytes, "value".getBytes) + producer.send(record1) + producer.send(record2) + + consumer.assign(asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1))) + val count = consumer.poll(Duration.ofMillis(5000)).count() + consumer.poll(Duration.ofMillis(5000)).count() + assertEquals(2, count) + } + + private def ensureControllerIn(brokerIds: Seq[Int]): Unit = { + while (!brokerIds.contains(controllerSocketServer.config.brokerId)) { + zkClient.deleteController(ZkVersion.MatchAnyVersion) + TestUtils.waitUntilControllerElected(zkClient) + } + } + + private def createConfig(nodeId: Int, interBrokerVersion: ApiVersion): KafkaConfig = { + val props = TestUtils.createBrokerConfig(nodeId, zkConnect) + props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerVersion.version) + KafkaConfig.fromProps(props) + } + +} Review comment: nit: Could we add an empty line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org