junrao commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1805350198
########## clients/src/main/java/org/apache/kafka/clients/ApiVersions.java: ########## @@ -88,5 +88,4 @@ private byte computeMaxUsableProduceMagic() { public synchronized byte maxUsableProduceMagic() { return maxUsableProduceMagic; } - Review Comment: Is this change needed? ########## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ########## @@ -154,7 +154,7 @@ class CoordinatorPartitionWriter( actionQueue = directActionQueue ) - val partitionResult = appendResults.getOrElse(tp, + val partitionResult = appendResults.getOrElse(replicaManager.topicIdPartition(tp), Review Comment: We call `replicaManager.topicIdPartition(tp)` earlier. Should we just reuse the result? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1410,34 +1416,34 @@ class ReplicaManager(val config: KafkaConfig, if (traceEnabled) trace(s"Append [$entriesPerPartition] to local log") - entriesPerPartition.map { case (topicPartition, records) => - brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark() + entriesPerPartition.map { case (topicIdPartition, records) => + brokerTopicStats.topicStats(topicIdPartition.topic).totalProduceRequestRate.mark() brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark() // reject appending to internal topics if it is not allowed - if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { - (topicPartition, LogAppendResult( + if (Topic.isInternal(topicIdPartition.topic) && !internalTopicsAllowed) { + (topicIdPartition, LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")), + Some(new InvalidTopicException(s"Cannot append to internal topic ${topicIdPartition.topic}")), hasCustomErrorMessage = false)) } else { try { - val partition = getPartitionOrException(topicPartition) + val partition = getPartitionOrException(topicIdPartition.topicPartition()) Review Comment: In the long term, we probably want to consolidate everything to finding partition by topicId. In the interim, perhaps it's useful to further verify that the topicId in the input matches the one in `Partition`. We could create a new method like `partitionOrException` that does this verification and start using it where the caller has topicId. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2468,7 +2483,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // Otherwise, the regular appendRecords path is used for all the non __consumer_offsets // partitions or for all partitions when the new group coordinator is disabled. - controlRecords += partition -> MemoryRecords.withEndTransactionMarker( + controlRecords += replicaManager.topicIdPartition(partition) -> MemoryRecords.withEndTransactionMarker( Review Comment: `replicaManager.topicIdPartition` could have zero for topicId. Should we return an error in that case? Every caller to `replicaManager.appendRecords` needs to deal with this issue. Is it better to deal with this inside `replicaManager.appendRecords` or in every caller? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -611,7 +612,10 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, // This will be set by completeBatch. Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>(); produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { - TopicPartition tp = new TopicPartition(r.name(), p.index()); + + // Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. + String topicName = metadata.topicNames().getOrDefault(r.topicId(), r.name()); Review Comment: Hmm, this doesn't work for responses of V11 or below, right? Perhaps we should check topicId is not zero before using it. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -623,40 +623,55 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() - val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() - val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]() - val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() + val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]() + val nonExistingTopicResponses = mutable.Map[TopicIdPartition, PartitionResponse]() + val invalidRequestResponses = mutable.Map[TopicIdPartition, PartitionResponse]() + val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]() + val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, ProduceRequestData.PartitionProduceData)] + + produceRequest.data.topicData.forEach { topic => + topic.partitionData.forEach { partition => + val (topicName, topicId) = if (topic.topicId().equals(Uuid.ZERO_UUID)) { + (topic.name(), metadataCache.getTopicId(topic.name())) + } else { + (metadataCache.getTopicName(topic.topicId).getOrElse(topic.name), topic.topicId()) + } + + val topicPartition = new TopicPartition(topicName, partition.index()) + if (topicName.isEmpty) Review Comment: Should we check that topicId is zero here too? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -866,17 +872,17 @@ class ReplicaManager(val config: KafkaConfig, s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")) case _ => None } - topicPartition -> LogAppendResult( + new TopicIdPartition(topicIds.getOrElse(topicPartition.topic(), Uuid.ZERO_UUID), topicPartition) -> LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(customException.getOrElse(error.exception)), hasCustomErrorMessage = customException.isDefined ) } - val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) } + val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case (key, _) => !errorResults.exists(_._1.topicPartition() == key.topicPartition()) } Review Comment: Could we just use `errorResults.contains(key)`? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2485,7 +2500,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestLocal = requestLocal, responseCallback = errors => { errors.foreachEntry { (tp, partitionResponse) => - markerResults.put(tp, partitionResponse.error) + markerResults.put(tp.topicPartition(), partitionResponse.error) Review Comment: tp => topicIdPartition? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -892,11 +897,15 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo // which is supporting the new magic version to one which doesn't, then we will need to convert. if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); - ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic()); + ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic(), topicIds.get(tp.topic())); + if (tpData == null) { - tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic()); + Uuid topicId = metadata.topicIds().getOrDefault(tp.topic(), Uuid.ZERO_UUID); Review Comment: Could we just get topicId from topicIds once and reuse it here? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1447,15 +1453,15 @@ class ReplicaManager(val config: KafkaConfig, _: RecordBatchTooLargeException | _: CorruptRecordException | _: KafkaStorageException) => - (topicPartition, LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), hasCustomErrorMessage = false)) + (topicIdPartition, LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), hasCustomErrorMessage = false)) case rve: RecordValidationException => - val logStartOffset = processFailedRecord(topicPartition, rve.invalidException) + val logStartOffset = processFailedRecord(topicIdPartition.topicPartition(), rve.invalidException) Review Comment: Could we change `processFailedRecord()` to take topicIdPartition instead of topicPartition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org