junrao commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r2038097007


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2339,80 +2357,90 @@ public void 
testBumpEpochWhenOutOfOrderSequenceReceived() throws InterruptedExce
 
     @Test
     public void testIdempotentSplitBatchAndSend() throws Exception {
-        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
+        Uuid topicId = Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1");
+        TOPIC_IDS.put("testSplitBatchAndSend", topicId);
+        TopicIdPartition tpId = new TopicIdPartition(topicId, new 
TopicPartition("testSplitBatchAndSend", 1));
         TransactionManager txnManager = createTransactionManager();
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         setupWithTransactionState(txnManager);
         prepareAndReceiveInitProducerId(123456L, Errors.NONE);
         assertTrue(txnManager.hasProducerId());
-        testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
+        testSplitBatchAndSend(txnManager, producerIdAndEpoch, tpId);
     }
 
     @Test
     public void testTransactionalSplitBatchAndSend() throws Exception {
+        Uuid topicId = Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1");
+        TOPIC_IDS.put("testSplitBatchAndSend", topicId);

Review Comment:
   Hmm, topicId in this test is the same as the one in 
testIdempotentSplitBatchAndSend. Will this cause confusion during testing since 
TOPIC_IDS accumulates across tests?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -153,9 +155,12 @@ public class SenderTest {
     private static final int DELIVERY_TIMEOUT_MS = 1500;
     private static final long TOPIC_IDLE_MS = 60 * 1000;
 
-    private final TopicPartition tp0 = new TopicPartition("test", 0);
-    private final TopicPartition tp1 = new TopicPartition("test", 1);
-    private final TopicPartition tp2 = new TopicPartition("test", 2);
+    private static final String TOPIC_NAME = "test";
+    private static final Uuid TOPIC_ID = 
Uuid.fromString("MKXx1fIkQy2J9jXHhK8m1w");
+    private static final Map<String, Uuid> TOPIC_IDS = new 
HashMap<>(Map.of(TOPIC_NAME, TOPIC_ID));

Review Comment:
   `new HashMap<>(Map.of(TOPIC_NAME, TOPIC_ID));`  => `Map.of(TOPIC_NAME, 
TOPIC_ID)); `?



##########
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala:
##########
@@ -33,8 +33,8 @@ import scala.jdk.CollectionConverters._
 
 class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
   val producerCount: Int = 1
-  val brokerCount: Int = 2
-  val defaultLingerMs: Int = 5;
+  val brokerCount: Int = 3

Review Comment:
   Why does the tests need 3 brokers?



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -94,9 +94,14 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => 
Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => 
resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
     ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => {
+
+      val topicId = topicNames.find { case (_, topicName) => topicName == 
topic}
+        .map { case (topicId, _) => topicId }
+        .getOrElse(Uuid.ZERO_UUID)

Review Comment:
   Hmm, should we only set topicId if version is <= 12? The produce response is 
only keyed on topic name when version is <= 12.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -578,6 +583,27 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  def getPartitionOrException(topicIdPartition: TopicIdPartition): Partition = 
{
+    getPartitionOrError(topicIdPartition.topicPartition()) match {
+      case Left(Errors.KAFKA_STORAGE_ERROR) =>
+        throw new KafkaStorageException(s"Partition 
${topicIdPartition.topicPartition()} is in an offline log directory")
+
+      case Left(error) =>
+        throw error.exception(s"Error while fetching partition state for 
${topicIdPartition.topicPartition()}")
+
+      case Right(partition) =>
+        // Get topic id for an existing partition from disk if topicId is none 
get it from the metadata cache
+        val topicId = 
partition.topicId.getOrElse(metadataCache.getTopicId(topicIdPartition.topic()))
+        // If topic id is set to zero or null fall back to non topic id aware 
behaviour

Review Comment:
   Could we adjust the comment since topic id can't be null?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2339,80 +2357,90 @@ public void 
testBumpEpochWhenOutOfOrderSequenceReceived() throws InterruptedExce
 
     @Test
     public void testIdempotentSplitBatchAndSend() throws Exception {
-        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
+        Uuid topicId = Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1");
+        TOPIC_IDS.put("testSplitBatchAndSend", topicId);
+        TopicIdPartition tpId = new TopicIdPartition(topicId, new 
TopicPartition("testSplitBatchAndSend", 1));
         TransactionManager txnManager = createTransactionManager();
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         setupWithTransactionState(txnManager);
         prepareAndReceiveInitProducerId(123456L, Errors.NONE);
         assertTrue(txnManager.hasProducerId());
-        testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
+        testSplitBatchAndSend(txnManager, producerIdAndEpoch, tpId);
     }
 
     @Test
     public void testTransactionalSplitBatchAndSend() throws Exception {
+        Uuid topicId = Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1");
+        TOPIC_IDS.put("testSplitBatchAndSend", topicId);
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
-        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
+        TopicIdPartition tpId = new TopicIdPartition(topicId, new 
TopicPartition("testSplitBatchAndSend", 1));
+
         TransactionManager txnManager = new TransactionManager(logContext, 
"testSplitBatchAndSend", 60000, 100, apiVersions);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
 
         txnManager.beginTransaction();
-        txnManager.maybeAddPartition(tp);
-        client.prepareResponse(buildAddPartitionsToTxnResponseData(0, 
Collections.singletonMap(tp, Errors.NONE)));
+        txnManager.maybeAddPartition(tpId.topicPartition());
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, 
(short) 0, (short) 13));

Review Comment:
   Version 0 is no longer supported for produce requests. Could we use 
olderVersion() and latestVersion() ?



##########
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java:
##########
@@ -112,6 +115,27 @@ public void testBuildWithCurrentMessageFormat() {
         assertEquals(ApiKeys.PRODUCE.latestVersion(), 
requestBuilder.latestAllowedVersion());
     }
 
+    @Test
+    public void testBuildWithCurrentMessageFormatWithoutTopicId() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE,
+                Compression.NONE, TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, null, "a".getBytes());
+        ProduceRequest.Builder requestBuilder = ProduceRequest.builder(
+                new ProduceRequestData()
+                        .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+                                        new 
ProduceRequestData.TopicProduceData()
+                                                .setName("test")
+                                                
.setPartitionData(Collections.singletonList(
+                                                        new 
ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build()))))
+                                .iterator()))
+                        .setAcks((short) 1)
+                        .setTimeoutMs(5000),
+                false);
+        assertEquals(3, requestBuilder.oldestAllowedVersion());
+        assertEquals(13, requestBuilder.latestAllowedVersion());

Review Comment:
   13 => PRODUCE.latestVersion()
   3 => PRODUCE.oldestVersion()
   Ditto above.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3380,6 +3412,9 @@ public void 
testWhenProduceResponseReturnsWithALeaderShipChangeErrorAndNewLeader
                         }
                     }));
             Cluster startingMetadataCluster = metadata.fetch();
+            startingMetadataCluster.nodes().forEach(node ->
+                    apiVersions.update(node.idString(), 
NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 13))

Review Comment:
   Version 0 is no longer supported for produce requests. Could we use 
olderVersion() and latestVersion() ? Ditto below.



##########
core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala:
##########
@@ -53,13 +53,14 @@ class ProduceRequestTest extends BaseRequestTest {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
 
     def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): Unit 
= {
-      val topicPartition = new TopicPartition("topic", partition)
+      val topicId = getTopicIds().get("topic").get
       val produceRequest = ProduceRequest.builder(new ProduceRequestData()
         .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
           new ProduceRequestData.TopicProduceData()
-            .setName(topicPartition.topic())
+            .setName("topic")

Review Comment:
   Should we just set topicId? Ditto below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to