junrao commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r2032049630


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -108,7 +108,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val groupConfigManager: GroupConfigManager
 ) extends ApiRequestHandler with Logging {
 
-  type FetchResponseStats = Map[TopicPartition, RecordValidationStats]
+  type FetchResponseStats = Map[TopicIdPartition, RecordValidationStats]

Review Comment:
   This is an existing issue, but it seems that FetchResponseStats should be 
ProduceResponseStats?



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1690,6 +1690,18 @@ public static ConfigDef mergeConfigs(List<ConfigDef> 
configDefs) {
         configDefs.forEach(configDef -> 
configDef.configKeys().values().forEach(all::define));
         return all;
     }
+
+    /**
+     * Convert a map's keys to another type.
+     */
+    public static <K1, K2, V> Map<K2, V> convertKeys(Map<K1, V> originalMap, 
Function<K1, K2> converter) {

Review Comment:
   This seems unused?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##########
@@ -649,16 +649,17 @@ class TransactionStateManager(brokerId: Int,
 
     val records = 
MemoryRecords.withRecords(TransactionLog.EnforcedCompression, new 
SimpleRecord(timestamp, keyBytes, valueBytes))
     val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, 
partitionFor(transactionalId))
-    val recordsPerPartition = Map(topicPartition -> records)
+    val transactionStateTopicIdPartition = 
replicaManager.topicIdPartition(topicPartition)

Review Comment:
   Could we rename topicPartition to transactionStateTopicPartition ?



##########
clients/src/test/java/org/apache/kafka/common/message/MessageTest.java:
##########
@@ -931,10 +931,18 @@ public void testProduceResponseVersions() throws 
Exception {
                 responseData.setThrottleTimeMs(0);
             }
 
+            if (version >= 13) {
+                responseData.responses().iterator().next().setTopicId(topicId);
+            } else {
+                responseData.responses().iterator().next().setName(topicName);
+            }
+
             if (version >= 3 && version <= 4) {
                 testAllMessageRoundTripsBetweenVersions(version, (short) 5, 
responseData, responseData);
             } else if (version >= 6 && version <= 7) {
                 testAllMessageRoundTripsBetweenVersions(version, (short) 8, 
responseData, responseData);
+            } else if (version < 12) {

Review Comment:
   `< 12` => `<= 12` ?



##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -85,4 +85,83 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
   }
 
+  /**
+   * Tests that Producer produce to new topic id after recreation.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata has been updated with new topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testSendWithRecreatedTopic(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    createTopic(topic)
+    val admin = createAdminClient()
+    val topicId = topicMetadata(admin, topic).topicId()
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+    // Start topic deletion
+    deleteTopic(topic, listenerName)
+
+    // Verify that the topic is deleted when no metadata request comes in
+    TestUtils.verifyTopicDeletion(topic, 2, brokers)
+    createTopic(topic)
+    assertNotEquals(topicId, topicMetadata(admin, topic).topicId())
+
+    // Producer should be able to send messages even after topic gets recreated
+    val recordMetadata: RecordMetadata = producer.send(new 
ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get
+    assertEquals(topic, recordMetadata.topic())
+    assertEquals(0, recordMetadata.offset())
+  }
+
+  /**
+   * Tests that Producer produce to topic during reassignment where topic 
metadata change on broker side.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata cache on the leader includes the 
partition topic id.
+   */
+  def testSendWithTopicReassignmentIsMidWay(): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    val partition0: TopicPartition = new TopicPartition(topic, 0)
+    val partition1 = new TopicPartition(topic, 1)
+    val admin: Admin = createAdminClient()
+
+    // Create topic with leader as 0 for the 2 partitions.
+    createTopicWithAssignment(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))

Review Comment:
   Could we do the test with just 1 partition?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -390,57 +390,72 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val unauthorizedTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-    val nonExistingTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-    val invalidRequestResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-    val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
+    val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+    val nonExistingTopicResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+    val invalidRequestResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+    val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]()
+    val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, 
ProduceRequestData.PartitionProduceData)]
+
+    produceRequest.data.topicData.forEach { topic =>
+      topic.partitionData.forEach { partition =>
+        val (topicName, topicId) = if (topic.topicId().equals(Uuid.ZERO_UUID)) 
{
+          (topic.name(), metadataCache.getTopicId(topic.name()))
+        } else {
+          (metadataCache.getTopicName(topic.topicId).orElse(topic.name), 
topic.topicId())
+        }
+
+        val topicPartition = new TopicPartition(topicName, partition.index())
+        if (topicName.isEmpty)
+          nonExistingTopicResponses += new TopicIdPartition(topicId, 
topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_ID)
+        else if (!metadataCache.contains(topicPartition))

Review Comment:
   It seems that we don't need this since it's handled in `ReplicaManager` too.



##########
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java:
##########
@@ -112,6 +115,27 @@ public void testBuildWithCurrentMessageFormat() {
         assertEquals(ApiKeys.PRODUCE.latestVersion(), 
requestBuilder.latestAllowedVersion());
     }
 
+    @Test
+    public void testBuildWithCurrentMessageFormatWithoutTopicId() {

Review Comment:
   This test now fails.



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -107,15 +112,19 @@ public ProduceRequest(ProduceRequestData 
produceRequestData, short version) {
     }
 
     // visible for testing
-    Map<TopicPartition, Integer> partitionSizes() {
+    Map<TopicIdPartition, Integer> partitionSizes() {
         if (partitionSizes == null) {
             // this method may be called by different thread (see the comment 
on data)
             synchronized (this) {
                 if (partitionSizes == null) {
-                    Map<TopicPartition, Integer> tmpPartitionSizes = new 
HashMap<>();
+                    Map<TopicIdPartition, Integer> tmpPartitionSizes = new 
HashMap<>();
                     data.topicData().forEach(topicData ->
                         topicData.partitionData().forEach(partitionData ->
-                            tmpPartitionSizes.compute(new 
TopicPartition(topicData.name(), partitionData.index()),
+                            // While topic id and name will never be populated 
at the same time in the request, to simplify

Review Comment:
   > While topic id and name will never be populated at the same time in the 
request
   
   This is true on the server side. On the producer side, we actually set both 
the topic name and the topic id. So, it will be useful to quantify that 
statement.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -88,6 +88,9 @@ public MirrorCheckpointTask() {}
         this.topicFilter = topic -> true;
         this.interval = Duration.ofNanos(1);
         this.pollTimeout = Duration.ofNanos(1);
+        // read __offset-sync-topic
+        // update __checkpoint_topic has consumer group offset on 
source/destination
+        // admincliuent -> consumergroupX 100 on source and consumergroupX 20 
on destination

Review Comment:
   Are these comments needed?



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##########
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicIdPartition;

Review Comment:
   Could we add the new error code UnknownTopicId in the javadoc below?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -534,8 +550,10 @@ public void testNodeLatencyStats() throws Exception {
                 new BufferPool(totalSize, batchSize, m, time, 
"producer-internal-metrics"));
 
             SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+            apiVersions.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, 
(short) 0, (short) 13));

Review Comment:
   Version 0 is no longer supported for produce requests. Could we use 
olderVersion() and latestVersion() ? Ditto in a few other places below.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -169,6 +169,9 @@ public void start(Map<String, String> props) {
             "refreshing topics");
         log.info("Started {} with {} topic-partitions.", connectorName, 
knownSourceTopicPartitions.size());
         log.info("Starting {} took {} ms.", connectorName, 
System.currentTimeMillis() - start);
+
+        // your topics -> destination
+        // __offset-sync-topic

Review Comment:
   Are these comments needed?



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -258,18 +263,20 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     new requests.MetadataRequest.Builder(List(topic).asJava, 
allowAutoTopicCreation).build()
   }
 
-  private def createProduceRequest =
+  private def createProduceRequestWithId(id: Uuid) = {
     requests.ProduceRequest.builder(new ProduceRequestData()
       .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
         Collections.singletonList(new ProduceRequestData.TopicProduceData()
-          .setName(tp.topic).setPartitionData(Collections.singletonList(
+          .setTopicId(id).setPartitionData(Collections.singletonList(
           new ProduceRequestData.PartitionProduceData()
             .setIndex(tp.partition)
             .setRecords(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("test".getBytes))))))
         .iterator))
       .setAcks(1.toShort)
       .setTimeoutMs(5000))
       .build()
+  }

Review Comment:
   Add a new line below? 



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -578,6 +583,27 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  def getPartitionOrException(topicIdPartition: TopicIdPartition): Partition = 
{
+    getPartitionOrError(topicIdPartition.topicPartition()) match {
+      case Left(Errors.KAFKA_STORAGE_ERROR) =>
+        throw new KafkaStorageException(s"Partition 
${topicIdPartition.topicPartition()} is in an offline log directory")
+
+      case Left(error) =>
+        throw error.exception(s"Error while fetching partition state for 
${topicIdPartition.topicPartition()}")
+
+      case Right(partition) =>
+        // Get topic id for an existing partition from disk if topicId is none 
get it from the metadata cache
+        val topicId = 
partition.topicId.getOrElse(metadataCache.getTopicId(topicIdPartition.topic()))
+        // If topic id is set to zero or null fall back to non topic id aware 
behaviour
+        val topicIdNotProvided = topicIdPartition.topicId() == Uuid.ZERO_UUID 
|| topicIdPartition.topicId() == null

Review Comment:
   It seems that `topicIdPartition.topicId()` is never null? Otherwise, we need 
to do the null check in every place where it's used.



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -54,6 +55,10 @@ public static Builder builder(ProduceRequestData data) {
         return builder(data, false);
     }
 
+    private static boolean canNotSupportTopicId(ProduceRequestData data) {

Review Comment:
   This seems unused?



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -94,9 +94,14 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => 
Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => 
resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
     ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => {
+
+      val topicId = topicNames.find { case (_, topicName) => topicName == 
topic}
+        .map { case (topicId, _) => topicId }
+        .getOrElse(Uuid.ZERO_UUID)
+      val topicName = if (version >= 12) "" else topic

Review Comment:
   `>= 12` => `>= 13` ?



##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -85,4 +85,83 @@ class ProducerSendWhileDeletionTest extends 
IntegrationTestHarness {
     assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
   }
 
+  /**
+   * Tests that Producer produce to new topic id after recreation.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata has been updated with new topic id.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testSendWithRecreatedTopic(quorum: String): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    createTopic(topic)
+    val admin = createAdminClient()
+    val topicId = topicMetadata(admin, topic).topicId()
+    val producer = createProducer()
+
+    (1 to numRecords).foreach { i =>
+      val resp = producer.send(new ProducerRecord(topic, null, ("value" + 
i).getBytes(StandardCharsets.UTF_8))).get
+      assertEquals(topic, resp.topic())
+    }
+    // Start topic deletion
+    deleteTopic(topic, listenerName)
+
+    // Verify that the topic is deleted when no metadata request comes in
+    TestUtils.verifyTopicDeletion(topic, 2, brokers)
+    createTopic(topic)
+    assertNotEquals(topicId, topicMetadata(admin, topic).topicId())
+
+    // Producer should be able to send messages even after topic gets recreated
+    val recordMetadata: RecordMetadata = producer.send(new 
ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get
+    assertEquals(topic, recordMetadata.topic())
+    assertEquals(0, recordMetadata.offset())
+  }
+
+  /**
+   * Tests that Producer produce to topic during reassignment where topic 
metadata change on broker side.
+   *
+   * Producer will attempt to send messages to the partition specified in each 
record, and should
+   * succeed as long as the metadata cache on the leader includes the 
partition topic id.
+   */
+  def testSendWithTopicReassignmentIsMidWay(): Unit = {
+    val numRecords = 10
+    val topic = "topic"
+    val partition0: TopicPartition = new TopicPartition(topic, 0)
+    val partition1 = new TopicPartition(topic, 1)
+    val admin: Admin = createAdminClient()
+
+    // Create topic with leader as 0 for the 2 partitions.
+    createTopicWithAssignment(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+    TestUtils.assertLeader(admin, partition1, 0)
+
+    val topicDetails = topicMetadata(admin, topic)
+    TestUtils.assertLeader(admin, partition1, 0)

Review Comment:
   Not sure why we verify partition1 again.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -171,6 +176,11 @@ public class SenderTest {
     @BeforeEach
     public void setup() {
         setupWithTransactionState(null);
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, 
(short) 0, (short) 13));

Review Comment:
   Version 0 is no longer supported for produce requests. Could we use 
`olderVersion()` and `latestVersion()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to