junrao commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1841285541


##########
clients/src/main/java/org/apache/kafka/clients/ApiVersions.java:
##########
@@ -89,4 +89,4 @@ public synchronized byte maxUsableProduceMagic() {
         return maxUsableProduceMagic;
     }
 
-}
+}

Review Comment:
   No need to change this file.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -727,6 +732,27 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  def getPartitionOrException(topicIdPartition: TopicIdPartition): Partition = 
{

Review Comment:
   getPartitionOrException => partitionOrException ?



##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -84,4 +84,83 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
   }
 
+  /**
+   * Tests that Producer produce to new topic id after recreation.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata has been updated with new topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testSendWithRecreatedTopic(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    createTopic(topic)
+    val admin = createAdminClient()
+    val topicId = topicMetadata(admin, topic).topicId()
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+    // Start topic deletion
+    deleteTopic(topic, listenerName)
+
+    // Verify that the topic is deleted when no metadata request comes in
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers)
+    createTopic(topic)
+    assertNotEquals(topicId, topicMetadata(admin, topic).topicId())
+
+    // Producer should be able to send messages even after topic gets recreated
+    val recordMetadata: RecordMetadata = producer.send(new 
ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get
+    assertEquals(topic, recordMetadata.topic())
+    assertEquals(0, recordMetadata.offset())
+  }
+
+  /**
+   * Tests that Producer produce to topic during reassignment where topic 
metadata change on broker side.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata on the leader has been updated with new 
topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testSendWithTopicReassignmentIsMidWay(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    val partition0: TopicPartition = new TopicPartition(topic, 0)
+    val partition1 = new TopicPartition(topic, 1)
+    val admin: Admin = createAdminClient()
+
+    // Create topic with leader as 0 for the 2 partitions.
+    createTopicWithAssignment(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+    TestUtils.assertLeader(admin, partition1, 0)
+
+    val topicDetails = topicMetadata(admin, topic)
+    assertEquals(0, topicDetails.partitions().get(0).leader().id())
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+
+    val reassignment = Map(
+      partition0 -> Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(1, 2))),
+      partition1 -> Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(1, 2)))
+    )
+
+    // Change assignment of one of the replicas from 0 to 2. Leadership moves 
be 1.
+    admin.alterPartitionReassignments(reassignment.asJava).all().get()
+    TestUtils.assertLeader(admin, partition1, 1)

Review Comment:
   Should we verify partition0 too?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -579,8 +595,11 @@ public void testNodeLatencyStats() throws Exception {
                 new BufferPool(totalSize, batchSize, m, time, 
"producer-internal-metrics"));
 
             SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+            ApiVersions apiVersions1 = new ApiVersions();
+            apiVersions1.update("0", 
NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 12));

Review Comment:
   Why do we need to create apiVersions1? It is the same as apiVersions.



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1684,6 +1684,16 @@ public static ConfigDef mergeConfigs(List<ConfigDef> 
configDefs) {
         configDefs.forEach(configDef -> 
configDef.configKeys().values().forEach(all::define));
         return all;
     }
+
+    /**
+     * Convert a map's keys to another type.
+     */
+    public static <K1, K2, V> Map<K2, V> convertKeys(Map<K1, V> originalMap, 
Function<K1, K2> converter) {
+        Map<K2, V> newMap = new HashMap<>();
+        originalMap.forEach((key, value) -> newMap.put(converter.apply(key), 
value));
+        return newMap;
+    }

Review Comment:
   The code is probably a bit simpler if written as
   ```
   return originalMap.entrySet().stream()
       .collect(Collectors.toMap(
           entry -> converter.apply(entry.getKey()),
           Map.Entry::getValue
       ));
   ```



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1486,22 +1513,24 @@ class ReplicaManager(val config: KafkaConfig,
                    _: RecordTooLargeException |
                    _: RecordBatchTooLargeException |
                    _: CorruptRecordException |
-                   _: KafkaStorageException) =>
-            (new TopicOptionalIdPartition(Optional.empty(), topicPartition), 
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), 
hasCustomErrorMessage = false))
+                   _: KafkaStorageException |
+                   _: UnknownTopicIdException) =>
+            (topicOptionalIdPartition, 
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), 
hasCustomErrorMessage = false))

Review Comment:
   Since the passed in entriesPerPartition always contains topicId, could we 
just return Map[TopicIdPartition, LogAppendResult] and get rid of 
TopicOptionalIdPartition?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -674,7 +689,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           if (request.header.apiVersion >= 10) {
             status.error match {
               case Errors.NOT_LEADER_OR_FOLLOWER =>
-                val leaderNode = getCurrentLeader(topicPartition, 
request.context.listenerName)
+                val leaderNode = 
getCurrentLeader(topicPartition.topicPartition(), request.context.listenerName)

Review Comment:
   topicPartition => topicIdPartition ?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1486,22 +1513,24 @@ class ReplicaManager(val config: KafkaConfig,
                    _: RecordTooLargeException |
                    _: RecordBatchTooLargeException |
                    _: CorruptRecordException |
-                   _: KafkaStorageException) =>
-            (new TopicOptionalIdPartition(Optional.empty(), topicPartition), 
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), 
hasCustomErrorMessage = false))
+                   _: KafkaStorageException |
+                   _: UnknownTopicIdException) =>
+            (topicOptionalIdPartition, 
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), 
hasCustomErrorMessage = false))
           case rve: RecordValidationException =>
-            val logStartOffset = processFailedRecord(topicPartition, 
rve.invalidException)
+            val logStartOffset = processFailedRecord(topicIdPartition, 
rve.invalidException)
             val recordErrors = rve.recordErrors
-            (new TopicOptionalIdPartition(Optional.empty(), topicPartition), 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset,
 recordErrors),
+            (topicOptionalIdPartition, 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset,
 recordErrors),
               Some(rve.invalidException), hasCustomErrorMessage = true))
           case t: Throwable =>
-            val logStartOffset = processFailedRecord(topicPartition, t)
-            (new TopicOptionalIdPartition(Optional.empty(), topicPartition), 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
+            val logStartOffset = processFailedRecord(topicIdPartition, t)
+            (topicOptionalIdPartition, 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
               Some(t), hasCustomErrorMessage = false))
         }
       }
     }
   }
 
+

Review Comment:
   extra new line



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##########
@@ -49,7 +49,7 @@ import scala.collection.mutable
 /**
  * Transaction state manager is part of the transaction coordinator, it 
manages:
  *
- * 1. the transaction log, which is a special internal topic.
+ * 1. the transaction log, which is a special internal topic.˚

Review Comment:
   unintended change?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2421,16 +2441,21 @@ private void testSplitBatchAndSend(TransactionManager 
txnManager,
         String metricGrpName = "producer-metrics";
         // Set a good compression ratio.
         CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 
0.2f);
+        ApiVersions apiVersions = new ApiVersions();

Review Comment:
   This overwrites the instance level one. Perhaps use a different name to make 
it clear?



##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -2564,41 +2562,56 @@ private ProduceRequest createProduceRequest(short 
version) {
                     .setAcks((short) -1)
                     .setTimeoutMs(123)
                     .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(singletonList(
-                            new ProduceRequestData.TopicProduceData()
-                                    .setName("topic1")
-                                    .setPartitionData(singletonList(new 
ProduceRequestData.PartitionProduceData()
-                                            .setIndex(1)
-                                            
.setRecords(records)))).iterator()));
+                            createTopicProduceData(version, records, new 
TopicIdPartition(Uuid.ZERO_UUID, 1, "topic1"))
+                    ).iterator()));
             return new ProduceRequest.Builder(version, version, 
data).build(version);
         }
+
         byte magic = version == 2 ? RecordBatch.MAGIC_VALUE_V1 : 
RecordBatch.MAGIC_VALUE_V2;
         MemoryRecords records = MemoryRecords.withRecords(magic, 
Compression.NONE, new SimpleRecord("woot".getBytes()));
+        TopicIdPartition topicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), 0, "test");
         return ProduceRequest.forMagic(magic,
                 new ProduceRequestData()
-                        .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(singletonList(
-                                new ProduceRequestData.TopicProduceData()
-                                        .setName("test")
-                                        .setPartitionData(singletonList(new 
ProduceRequestData.PartitionProduceData()
-                                                .setIndex(0)
-                                                
.setRecords(records)))).iterator()))
+                        .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(
+                                singletonList(createTopicProduceData(version, 
records, topicIdPartition)).iterator()
+                        ))
                         .setAcks((short) 1)
                         .setTimeoutMs(5000)
                         .setTransactionalId(version >= 3 ? "transactionalId" : 
null))
                 .build(version);
     }
 
+    private static ProduceRequestData.TopicProduceData 
createTopicProduceData(short version, MemoryRecords records, TopicIdPartition 
tp) {
+        ProduceRequestData.TopicProduceData topicProduceData = new 
ProduceRequestData.TopicProduceData()
+                .setPartitionData(singletonList(new 
ProduceRequestData.PartitionProduceData()
+                        .setIndex(tp.partition())
+                        .setRecords(records)));
+        if (version >= 12) {
+            topicProduceData.setTopicId(tp.topicId());
+        } else {
+            topicProduceData.setName(tp.topic());
+        }
+        return topicProduceData;
+    }
+
+    private static TopicIdPartition createTopicIdPartition(Uuid topicId, int 
partitionIndex) {
+        return new TopicIdPartition(topicId, partitionIndex, "");

Review Comment:
   Why is topic name an empty string?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1024,12 +1051,12 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, 
MemoryRecords],
-                                             responseCallback: 
Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+  private def sendInvalidRequiredAcksResponse(entries: Map[TopicIdPartition, 
MemoryRecords],
+                                             responseCallback: 
Map[TopicIdPartition, PartitionResponse] => Unit): Unit = {

Review Comment:
   There is an existing issue with indentation.



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -92,9 +92,11 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => 
Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => 
resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
     ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => {
+      val topicId = topicNames.find(topicName => topicName._2 == 
topic).map(_._1).getOrElse(Uuid.ZERO_UUID)

Review Comment:
   Could we use `find { case(topicId, topicName) ... }` to avoid using unnamed 
references? Ditto below.



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -263,18 +265,20 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     new requests.MetadataRequest.Builder(List(topic).asJava, 
allowAutoTopicCreation).build()
   }
 
-  private def createProduceRequest =
+  private def createProduceRequestWithId(id: Uuid) = {
     requests.ProduceRequest.forCurrentMagic(new ProduceRequestData()
       .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
         Collections.singletonList(new ProduceRequestData.TopicProduceData()
-          .setName(tp.topic).setPartitionData(Collections.singletonList(
+          
.setName(tp.topic).setTopicId(id).setPartitionData(Collections.singletonList(

Review Comment:
   Do we still need to set topic name?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2892,18 +2914,21 @@ class KafkaApisTest extends Logging {
     val topic = "topic"
     val transactionalId = "txn1"
 
-    addTopicToMetadataCache(topic, numPartitions = 2)
+    val topicId = Uuid.fromString("d2Gg8tgzJa2JYK2eTHUapg")
+    val tp = new TopicIdPartition(topicId, 0, "topic")
+    addTopicToMetadataCache(topic, numPartitions = 2, topicId = tp.topicId())
 
     for (version <- 3 to ApiKeys.PRODUCE.latestVersion) {
 
       reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, 
requestChannel, txnCoordinator)
 
-      val tp = new TopicPartition("topic", 0)

Review Comment:
   We now have an extra new line.



##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -84,4 +84,83 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
   }
 
+  /**
+   * Tests that Producer produce to new topic id after recreation.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata has been updated with new topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testSendWithRecreatedTopic(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    createTopic(topic)
+    val admin = createAdminClient()
+    val topicId = topicMetadata(admin, topic).topicId()
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+    // Start topic deletion
+    deleteTopic(topic, listenerName)
+
+    // Verify that the topic is deleted when no metadata request comes in
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers)
+    createTopic(topic)
+    assertNotEquals(topicId, topicMetadata(admin, topic).topicId())
+
+    // Producer should be able to send messages even after topic gets recreated
+    val recordMetadata: RecordMetadata = producer.send(new 
ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get
+    assertEquals(topic, recordMetadata.topic())
+    assertEquals(0, recordMetadata.offset())
+  }
+
+  /**
+   * Tests that Producer produce to topic during reassignment where topic 
metadata change on broker side.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata on the leader has been updated with new 
topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testSendWithTopicReassignmentIsMidWay(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    val partition0: TopicPartition = new TopicPartition(topic, 0)
+    val partition1 = new TopicPartition(topic, 1)
+    val admin: Admin = createAdminClient()
+
+    // Create topic with leader as 0 for the 2 partitions.
+    createTopicWithAssignment(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+    TestUtils.assertLeader(admin, partition1, 0)
+
+    val topicDetails = topicMetadata(admin, topic)
+    assertEquals(0, topicDetails.partitions().get(0).leader().id())
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+
+    val reassignment = Map(
+      partition0 -> Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(1, 2))),
+      partition1 -> Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(1, 2)))
+    )
+
+    // Change assignment of one of the replicas from 0 to 2. Leadership moves 
be 1.
+    admin.alterPartitionReassignments(reassignment.asJava).all().get()
+    TestUtils.assertLeader(admin, partition1, 1)
+    assertEquals(topicDetails.topicId(), topicMetadata(admin, topic).topicId())
+
+    // Producer should be able to send messages even after topic gets 
reassigned
+    assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
+  }
+
+  def topicMetadata(admin: Admin, topic: String): TopicDescription = {

Review Comment:
   Could this be private?



##########
core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala:
##########
@@ -129,10 +129,13 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
     val version = ApiKeys.PRODUCE.latestVersion: Short
     val (serializedBytes, responseHeaderVersion) = {
       val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, "", 
correlationId)
+      val topicId = getTopicIds().getOrElse(topicPartition.topic(), 
Uuid.ZERO_UUID)
       val request = requests.ProduceRequest.forCurrentMagic(new 
ProduceRequestData()
         .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
           Collections.singletonList(new ProduceRequestData.TopicProduceData()
-            
.setName(topicPartition.topic()).setPartitionData(Collections.singletonList(
+            .setName(topicPartition.topic())

Review Comment:
   Do we still need to set topic name? Ditto in a few other tests.



##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -84,4 +84,83 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
   }
 
+  /**
+   * Tests that Producer produce to new topic id after recreation.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata has been updated with new topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testSendWithRecreatedTopic(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    createTopic(topic)
+    val admin = createAdminClient()
+    val topicId = topicMetadata(admin, topic).topicId()
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+    // Start topic deletion
+    deleteTopic(topic, listenerName)
+
+    // Verify that the topic is deleted when no metadata request comes in
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers)
+    createTopic(topic)
+    assertNotEquals(topicId, topicMetadata(admin, topic).topicId())
+
+    // Producer should be able to send messages even after topic gets recreated
+    val recordMetadata: RecordMetadata = producer.send(new 
ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get
+    assertEquals(topic, recordMetadata.topic())
+    assertEquals(0, recordMetadata.offset())
+  }
+
+  /**
+   * Tests that Producer produce to topic during reassignment where topic 
metadata change on broker side.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata on the leader has been updated with new 
topic id.

Review Comment:
   The comment is inaccurate since reassigning a partition doesn't change the 
topic Id. 



##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -84,4 +84,83 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
   }
 
+  /**
+   * Tests that Producer produce to new topic id after recreation.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata has been updated with new topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testSendWithRecreatedTopic(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    createTopic(topic)
+    val admin = createAdminClient()
+    val topicId = topicMetadata(admin, topic).topicId()
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+    // Start topic deletion
+    deleteTopic(topic, listenerName)
+
+    // Verify that the topic is deleted when no metadata request comes in
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers)
+    createTopic(topic)
+    assertNotEquals(topicId, topicMetadata(admin, topic).topicId())
+
+    // Producer should be able to send messages even after topic gets recreated
+    val recordMetadata: RecordMetadata = producer.send(new 
ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get
+    assertEquals(topic, recordMetadata.topic())
+    assertEquals(0, recordMetadata.offset())
+  }
+
+  /**
+   * Tests that Producer produce to topic during reassignment where topic 
metadata change on broker side.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata on the leader has been updated with new 
topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testSendWithTopicReassignmentIsMidWay(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    val partition0: TopicPartition = new TopicPartition(topic, 0)
+    val partition1 = new TopicPartition(topic, 1)
+    val admin: Admin = createAdminClient()
+
+    // Create topic with leader as 0 for the 2 partitions.
+    createTopicWithAssignment(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+    TestUtils.assertLeader(admin, partition1, 0)
+
+    val topicDetails = topicMetadata(admin, topic)
+    assertEquals(0, topicDetails.partitions().get(0).leader().id())

Review Comment:
   Why do we verify the leader here using a different method from the one in 
line 138?



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3186,7 +3191,7 @@ class ReplicaManagerTest {
       requiredAcks = requiredAcks,
       internalTopicsAllowed = false,
       transactionalId = transactionalId,
-      entriesPerPartition = entriesPerPartition,
+      entriesPerPartition = entriesPerPartition.map(e => 
replicaManager.topicIdPartition(e._1) -> e._2),

Review Comment:
   Could we use `map { case ... }` to avoid unnamed references?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2468,7 +2483,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           } else {
             // Otherwise, the regular appendRecords path is used for all the 
non __consumer_offsets
             // partitions or for all partitions when the new group coordinator 
is disabled.
-            controlRecords += partition -> 
MemoryRecords.withEndTransactionMarker(
+            controlRecords += replicaManager.topicIdPartition(partition) -> 
MemoryRecords.withEndTransactionMarker(

Review Comment:
   We could leave it as it is. However, it could be useful to document for 
`replicaManager.appendRecords` whether the passed in partitions can have 
topicId with 0 or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to