junrao commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1785440699


##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -84,4 +84,94 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
   }
 
+  /**
+   * Tests that Producer produce to new topic id after recreation.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata has been updated with new topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendWithRecreatedTopic(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    createTopic(topic)
+    val admin = createAdminClient()
+    val topicId = getTopicMetadata(admin, topic).topicId()
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+    // Start topic deletion
+    deleteTopic(topic, listenerName)
+
+    // Verify that the topic is deleted when no metadata request comes in
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers)
+    createTopic(topic)
+    assertNotEquals(topicId, getTopicMetadata(admin, topic).topicId())
+
+    // Producer should be able to send messages even after topic gets recreated
+    val recordMetadata: RecordMetadata = producer.send(new 
ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get
+    assertEquals(topic, recordMetadata.topic())
+    assertEquals(0, recordMetadata.offset())
+  }
+
+  /**
+   * Tests that Producer produce to topic during reassignment where topic 
metadata change on broker side.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata on the leader has been updated with new 
topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendWithTopicReassignmentIsMidWay(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    val partition0: TopicPartition = new TopicPartition(topic, 0)
+    val partition1 = new TopicPartition(topic, 1)
+    val admin: Admin = createAdminClient()
+
+    // Create topic with leader as 0 for the 2 partitions.
+    createTopicWithAssignment(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+    TestUtils.assertLeader(admin, partition1, 0)
+
+    val topicDetails = getTopicMetadata(admin, topic)
+    assertEquals(0, topicDetails.partitions().get(0).leader().id())
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+
+    val reassignment = Map(
+      partition0 -> Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(2, 1))),
+      partition1 -> Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(2, 1)))
+    )
+
+    // Change assignment of one of the replicas from 0 to 2
+    admin.alterPartitionReassignments(reassignment.asJava).all().get()
+
+    TestUtils.waitUntilTrue(
+      () => partitionLeader(admin, partition0) == 2 && partitionLeader(admin, 
partition1) == 2,

Review Comment:
   It's possible that the initial leader is broker 1. In that case, 
reassignment doesn't immediately change the leader to 2. We could probably 
simplify the test to change replicas from (0) to (1).



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -923,6 +943,15 @@ private void sendProduceRequest(long now, int destination, 
short acks, int timeo
         log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
     }
 
+    private Map<String, Uuid> getTopicIdsForBatches(List<ProducerBatch> 
batches) {

Review Comment:
   We don't use getters. So this can just be `topicIdsForBatches`.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -647,6 +647,11 @@ class ReplicaManager(val config: KafkaConfig,
     errorMap
   }
 
+  def getTopicIdPartition(topicPartition: TopicPartition): TopicIdPartition = {

Review Comment:
   We don't use getters. So this can just be topicIdPartition.



##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -84,4 +84,94 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
   }
 
+  /**
+   * Tests that Producer produce to new topic id after recreation.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata has been updated with new topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))

Review Comment:
   4.0 won't support ZK. So we could exclude zk. Ditto below.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -93,6 +95,7 @@ class GroupMetadataManagerTest {
       config.groupCoordinatorConfig.offsetCommitTimeoutMs)
   }
 
+

Review Comment:
   extra new line



##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -84,4 +84,94 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
   }
 
+  /**
+   * Tests that Producer produce to new topic id after recreation.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata has been updated with new topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendWithRecreatedTopic(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    createTopic(topic)
+    val admin = createAdminClient()
+    val topicId = getTopicMetadata(admin, topic).topicId()
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+    // Start topic deletion
+    deleteTopic(topic, listenerName)
+
+    // Verify that the topic is deleted when no metadata request comes in
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers)
+    createTopic(topic)
+    assertNotEquals(topicId, getTopicMetadata(admin, topic).topicId())
+
+    // Producer should be able to send messages even after topic gets recreated
+    val recordMetadata: RecordMetadata = producer.send(new 
ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get
+    assertEquals(topic, recordMetadata.topic())
+    assertEquals(0, recordMetadata.offset())
+  }
+
+  /**
+   * Tests that Producer produce to topic during reassignment where topic 
metadata change on broker side.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata on the leader has been updated with new 
topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendWithTopicReassignmentIsMidWay(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    val partition0: TopicPartition = new TopicPartition(topic, 0)
+    val partition1 = new TopicPartition(topic, 1)
+    val admin: Admin = createAdminClient()
+
+    // Create topic with leader as 0 for the 2 partitions.
+    createTopicWithAssignment(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+    TestUtils.assertLeader(admin, partition1, 0)
+
+    val topicDetails = getTopicMetadata(admin, topic)
+    assertEquals(0, topicDetails.partitions().get(0).leader().id())
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+
+    val reassignment = Map(
+      partition0 -> Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(2, 1))),
+      partition1 -> Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(2, 1)))
+    )
+
+    // Change assignment of one of the replicas from 0 to 2
+    admin.alterPartitionReassignments(reassignment.asJava).all().get()
+
+    TestUtils.waitUntilTrue(
+      () => partitionLeader(admin, partition0) == 2 && partitionLeader(admin, 
partition1) == 2,
+      s"Expected preferred leader to become 2, but is ${partitionLeader(admin, 
partition0)} and ${partitionLeader(admin, partition1)}",
+      10000)
+    TestUtils.assertLeader(admin, partition1, 2)
+    assertEquals(topicDetails.topicId(), getTopicMetadata(admin, 
topic).topicId())
+
+    // Producer should be able to send messages even after topic gets 
reassigned
+    assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
+  }
+
+  def getTopicMetadata(admin: Admin, topic: String): TopicDescription = {
+    
admin.describeTopics(util.Collections.singletonList(topic)).allTopicNames().get().get(topic)
+  }
+
+  def partitionLeader(admin: Admin, topicPartition: TopicPartition): Int = {

Review Comment:
   Could it be private?



##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -84,4 +84,94 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
   }
 
+  /**
+   * Tests that Producer produce to new topic id after recreation.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata has been updated with new topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendWithRecreatedTopic(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    createTopic(topic)
+    val admin = createAdminClient()
+    val topicId = getTopicMetadata(admin, topic).topicId()
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+    // Start topic deletion
+    deleteTopic(topic, listenerName)
+
+    // Verify that the topic is deleted when no metadata request comes in
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers)
+    createTopic(topic)
+    assertNotEquals(topicId, getTopicMetadata(admin, topic).topicId())
+
+    // Producer should be able to send messages even after topic gets recreated
+    val recordMetadata: RecordMetadata = producer.send(new 
ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get
+    assertEquals(topic, recordMetadata.topic())
+    assertEquals(0, recordMetadata.offset())
+  }
+
+  /**
+   * Tests that Producer produce to topic during reassignment where topic 
metadata change on broker side.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata on the leader has been updated with new 
topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendWithTopicReassignmentIsMidWay(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    val partition0: TopicPartition = new TopicPartition(topic, 0)
+    val partition1 = new TopicPartition(topic, 1)
+    val admin: Admin = createAdminClient()
+
+    // Create topic with leader as 0 for the 2 partitions.
+    createTopicWithAssignment(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+    TestUtils.assertLeader(admin, partition1, 0)
+
+    val topicDetails = getTopicMetadata(admin, topic)
+    assertEquals(0, topicDetails.partitions().get(0).leader().id())
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+
+    val reassignment = Map(
+      partition0 -> Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(2, 1))),
+      partition1 -> Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(2, 1)))
+    )
+
+    // Change assignment of one of the replicas from 0 to 2
+    admin.alterPartitionReassignments(reassignment.asJava).all().get()
+
+    TestUtils.waitUntilTrue(
+      () => partitionLeader(admin, partition0) == 2 && partitionLeader(admin, 
partition1) == 2,
+      s"Expected preferred leader to become 2, but is ${partitionLeader(admin, 
partition0)} and ${partitionLeader(admin, partition1)}",
+      10000)
+    TestUtils.assertLeader(admin, partition1, 2)
+    assertEquals(topicDetails.topicId(), getTopicMetadata(admin, 
topic).topicId())
+
+    // Producer should be able to send messages even after topic gets 
reassigned
+    assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
+  }
+
+  def getTopicMetadata(admin: Admin, topic: String): TopicDescription = {

Review Comment:
   We don't use getters. So this can just be `topicMetadata`. Also, could it be 
private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to