junrao commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r2038097007
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ########## @@ -2339,80 +2357,90 @@ public void testBumpEpochWhenOutOfOrderSequenceReceived() throws InterruptedExce @Test public void testIdempotentSplitBatchAndSend() throws Exception { - TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1); + Uuid topicId = Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1"); + TOPIC_IDS.put("testSplitBatchAndSend", topicId); + TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("testSplitBatchAndSend", 1)); TransactionManager txnManager = createTransactionManager(); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); setupWithTransactionState(txnManager); prepareAndReceiveInitProducerId(123456L, Errors.NONE); assertTrue(txnManager.hasProducerId()); - testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp); + testSplitBatchAndSend(txnManager, producerIdAndEpoch, tpId); } @Test public void testTransactionalSplitBatchAndSend() throws Exception { + Uuid topicId = Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1"); + TOPIC_IDS.put("testSplitBatchAndSend", topicId); Review Comment: Hmm, topicId in this test is the same as the one in testIdempotentSplitBatchAndSend. Will this cause confusion during testing since TOPIC_IDS accumulates across tests? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ########## @@ -153,9 +155,12 @@ public class SenderTest { private static final int DELIVERY_TIMEOUT_MS = 1500; private static final long TOPIC_IDLE_MS = 60 * 1000; - private final TopicPartition tp0 = new TopicPartition("test", 0); - private final TopicPartition tp1 = new TopicPartition("test", 1); - private final TopicPartition tp2 = new TopicPartition("test", 2); + private static final String TOPIC_NAME = "test"; + private static final Uuid TOPIC_ID = Uuid.fromString("MKXx1fIkQy2J9jXHhK8m1w"); + private static final Map<String, Uuid> TOPIC_IDS = new HashMap<>(Map.of(TOPIC_NAME, TOPIC_ID)); Review Comment: `new HashMap<>(Map.of(TOPIC_NAME, TOPIC_ID));` => `Map.of(TOPIC_NAME, TOPIC_ID)); `? ########## core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala: ########## @@ -33,8 +33,8 @@ import scala.jdk.CollectionConverters._ class ProducerSendWhileDeletionTest extends IntegrationTestHarness { val producerCount: Int = 1 - val brokerCount: Int = 2 - val defaultLingerMs: Int = 5; + val brokerCount: Int = 3 Review Comment: Why does the tests need 3 brokers? ########## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ########## @@ -94,9 +94,14 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val requestKeyToError = (topicNames: Map[Uuid, String], version: Short) => Map[ApiKeys, Nothing => Errors]( ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => { + + val topicId = topicNames.find { case (_, topicName) => topicName == topic} + .map { case (topicId, _) => topicId } + .getOrElse(Uuid.ZERO_UUID) Review Comment: Hmm, should we only set topicId if version is <= 12? The produce response is only keyed on topic name when version is <= 12. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -578,6 +583,27 @@ class ReplicaManager(val config: KafkaConfig, } } + def getPartitionOrException(topicIdPartition: TopicIdPartition): Partition = { + getPartitionOrError(topicIdPartition.topicPartition()) match { + case Left(Errors.KAFKA_STORAGE_ERROR) => + throw new KafkaStorageException(s"Partition ${topicIdPartition.topicPartition()} is in an offline log directory") + + case Left(error) => + throw error.exception(s"Error while fetching partition state for ${topicIdPartition.topicPartition()}") + + case Right(partition) => + // Get topic id for an existing partition from disk if topicId is none get it from the metadata cache + val topicId = partition.topicId.getOrElse(metadataCache.getTopicId(topicIdPartition.topic())) + // If topic id is set to zero or null fall back to non topic id aware behaviour Review Comment: Could we adjust the comment since topic id can't be null? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ########## @@ -2339,80 +2357,90 @@ public void testBumpEpochWhenOutOfOrderSequenceReceived() throws InterruptedExce @Test public void testIdempotentSplitBatchAndSend() throws Exception { - TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1); + Uuid topicId = Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1"); + TOPIC_IDS.put("testSplitBatchAndSend", topicId); + TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("testSplitBatchAndSend", 1)); TransactionManager txnManager = createTransactionManager(); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); setupWithTransactionState(txnManager); prepareAndReceiveInitProducerId(123456L, Errors.NONE); assertTrue(txnManager.hasProducerId()); - testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp); + testSplitBatchAndSend(txnManager, producerIdAndEpoch, tpId); } @Test public void testTransactionalSplitBatchAndSend() throws Exception { + Uuid topicId = Uuid.fromString("2J9hK8m1wHMKjXfIkQyXx1"); + TOPIC_IDS.put("testSplitBatchAndSend", topicId); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); - TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1); + TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("testSplitBatchAndSend", 1)); + TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100, apiVersions); setupWithTransactionState(txnManager); doInitTransactions(txnManager, producerIdAndEpoch); txnManager.beginTransaction(); - txnManager.maybeAddPartition(tp); - client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp, Errors.NONE))); + txnManager.maybeAddPartition(tpId.topicPartition()); + apiVersions.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 13)); Review Comment: Version 0 is no longer supported for produce requests. Could we use olderVersion() and latestVersion() ? ########## clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java: ########## @@ -112,6 +115,27 @@ public void testBuildWithCurrentMessageFormat() { assertEquals(ApiKeys.PRODUCE.latestVersion(), requestBuilder.latestAllowedVersion()); } + @Test + public void testBuildWithCurrentMessageFormatWithoutTopicId() { + ByteBuffer buffer = ByteBuffer.allocate(256); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, + Compression.NONE, TimestampType.CREATE_TIME, 0L); + builder.append(10L, null, "a".getBytes()); + ProduceRequest.Builder requestBuilder = ProduceRequest.builder( + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("test") + .setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(9).setRecords(builder.build())))) + .iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000), + false); + assertEquals(3, requestBuilder.oldestAllowedVersion()); + assertEquals(13, requestBuilder.latestAllowedVersion()); Review Comment: 13 => PRODUCE.latestVersion() 3 => PRODUCE.oldestVersion() Ditto above. ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ########## @@ -3380,6 +3412,9 @@ public void testWhenProduceResponseReturnsWithALeaderShipChangeErrorAndNewLeader } })); Cluster startingMetadataCluster = metadata.fetch(); + startingMetadataCluster.nodes().forEach(node -> + apiVersions.update(node.idString(), NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 13)) Review Comment: Version 0 is no longer supported for produce requests. Could we use olderVersion() and latestVersion() ? Ditto below. ########## core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala: ########## @@ -53,13 +53,14 @@ class ProduceRequestTest extends BaseRequestTest { val (partition, leader) = createTopicAndFindPartitionWithLeader("topic") def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): Unit = { - val topicPartition = new TopicPartition("topic", partition) + val topicId = getTopicIds().get("topic").get val produceRequest = ProduceRequest.builder(new ProduceRequestData() .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( new ProduceRequestData.TopicProduceData() - .setName(topicPartition.topic()) + .setName("topic") Review Comment: Should we just set topicId? Ditto below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org