junrao commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1805350198


##########
clients/src/main/java/org/apache/kafka/clients/ApiVersions.java:
##########
@@ -88,5 +88,4 @@ private byte computeMaxUsableProduceMagic() {
     public synchronized byte maxUsableProduceMagic() {
         return maxUsableProduceMagic;
     }
-

Review Comment:
   Is this change needed?



##########
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##########
@@ -154,7 +154,7 @@ class CoordinatorPartitionWriter(
       actionQueue = directActionQueue
     )
 
-    val partitionResult = appendResults.getOrElse(tp,
+    val partitionResult = 
appendResults.getOrElse(replicaManager.topicIdPartition(tp),

Review Comment:
   We call `replicaManager.topicIdPartition(tp)` earlier. Should we just reuse 
the result?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1410,34 +1416,34 @@ class ReplicaManager(val config: KafkaConfig,
     if (traceEnabled)
       trace(s"Append [$entriesPerPartition] to local log")
 
-    entriesPerPartition.map { case (topicPartition, records) =>
-      
brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
+    entriesPerPartition.map { case (topicIdPartition, records) =>
+      
brokerTopicStats.topicStats(topicIdPartition.topic).totalProduceRequestRate.mark()
       brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
 
       // reject appending to internal topics if it is not allowed
-      if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
-        (topicPartition, LogAppendResult(
+      if (Topic.isInternal(topicIdPartition.topic) && !internalTopicsAllowed) {
+        (topicIdPartition, LogAppendResult(
           LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-          Some(new InvalidTopicException(s"Cannot append to internal topic 
${topicPartition.topic}")),
+          Some(new InvalidTopicException(s"Cannot append to internal topic 
${topicIdPartition.topic}")),
           hasCustomErrorMessage = false))
       } else {
         try {
-          val partition = getPartitionOrException(topicPartition)
+          val partition = 
getPartitionOrException(topicIdPartition.topicPartition())

Review Comment:
   In the long term, we probably want to consolidate everything to finding 
partition by topicId. In the interim, perhaps it's useful to further verify 
that the topicId in the input matches the one in `Partition`. We could create a 
new method like `partitionOrException` that does this verification and start 
using it where the caller has topicId.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2468,7 +2483,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           } else {
             // Otherwise, the regular appendRecords path is used for all the 
non __consumer_offsets
             // partitions or for all partitions when the new group coordinator 
is disabled.
-            controlRecords += partition -> 
MemoryRecords.withEndTransactionMarker(
+            controlRecords += replicaManager.topicIdPartition(partition) -> 
MemoryRecords.withEndTransactionMarker(

Review Comment:
   `replicaManager.topicIdPartition` could have zero for topicId. Should we 
return an error in that case? Every caller to `replicaManager.appendRecords` 
needs to deal with this issue. Is it better to deal with this inside 
`replicaManager.appendRecords` or in every caller?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -611,7 +612,10 @@ private void handleProduceResponse(ClientResponse 
response, Map<TopicPartition,
                 // This will be set by completeBatch.
                 Map<TopicPartition, Metadata.LeaderIdAndEpoch> 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
                 produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
-                    TopicPartition tp = new TopicPartition(r.name(), 
p.index());
+
+                    // Version 12 drop topic name and add support to topic id. 
However, metadata can be used to map topic id to topic name.
+                    String topicName = 
metadata.topicNames().getOrDefault(r.topicId(), r.name());

Review Comment:
   Hmm, this doesn't work for responses of V11 or below, right? Perhaps we 
should check topicId is not zero before using it.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -623,40 +623,55 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val unauthorizedTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-    val nonExistingTopicResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-    val invalidRequestResponses = mutable.Map[TopicPartition, 
PartitionResponse]()
-    val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
+    val unauthorizedTopicResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+    val nonExistingTopicResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+    val invalidRequestResponses = mutable.Map[TopicIdPartition, 
PartitionResponse]()
+    val authorizedRequestInfo = mutable.Map[TopicIdPartition, MemoryRecords]()
+    val topicIdToPartitionData = new mutable.ArrayBuffer[(TopicIdPartition, 
ProduceRequestData.PartitionProduceData)]
+
+    produceRequest.data.topicData.forEach { topic =>
+      topic.partitionData.forEach { partition =>
+        val (topicName, topicId) = if (topic.topicId().equals(Uuid.ZERO_UUID)) 
{
+          (topic.name(), metadataCache.getTopicId(topic.name()))
+        } else {
+          (metadataCache.getTopicName(topic.topicId).getOrElse(topic.name), 
topic.topicId())
+        }
+
+        val topicPartition = new TopicPartition(topicName, partition.index())
+        if (topicName.isEmpty)

Review Comment:
   Should we check that topicId is zero here too?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -866,17 +872,17 @@ class ReplicaManager(val config: KafkaConfig,
                 s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
               case _ => None
             }
-          topicPartition -> LogAppendResult(
+          new TopicIdPartition(topicIds.getOrElse(topicPartition.topic(), 
Uuid.ZERO_UUID), topicPartition) -> LogAppendResult(
             LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
             Some(customException.getOrElse(error.exception)),
             hasCustomErrorMessage = customException.isDefined
           )
       }
-      val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case 
(key, _) => !errorResults.contains(key) }
+      val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case 
(key, _) => !errorResults.exists(_._1.topicPartition() == key.topicPartition()) 
}

Review Comment:
   Could we just use `errorResults.contains(key)`?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2485,7 +2500,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           requestLocal = requestLocal,
           responseCallback = errors => {
             errors.foreachEntry { (tp, partitionResponse) =>
-              markerResults.put(tp, partitionResponse.error)
+              markerResults.put(tp.topicPartition(), partitionResponse.error)

Review Comment:
   tp => topicIdPartition?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -892,11 +897,15 @@ private void sendProduceRequest(long now, int 
destination, short acks, int timeo
             // which is supporting the new magic version to one which doesn't, 
then we will need to convert.
             if (!records.hasMatchingMagic(minUsedMagic))
                 records = batch.records().downConvert(minUsedMagic, 0, 
time).records();
-            ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
+            ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic(), 
topicIds.get(tp.topic()));
+
             if (tpData == null) {
-                tpData = new 
ProduceRequestData.TopicProduceData().setName(tp.topic());
+                Uuid topicId = metadata.topicIds().getOrDefault(tp.topic(), 
Uuid.ZERO_UUID);

Review Comment:
   Could we just get topicId from topicIds once and reuse it here?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1447,15 +1453,15 @@ class ReplicaManager(val config: KafkaConfig,
                    _: RecordBatchTooLargeException |
                    _: CorruptRecordException |
                    _: KafkaStorageException) =>
-            (topicPartition, 
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), 
hasCustomErrorMessage = false))
+            (topicIdPartition, 
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), 
hasCustomErrorMessage = false))
           case rve: RecordValidationException =>
-            val logStartOffset = processFailedRecord(topicPartition, 
rve.invalidException)
+            val logStartOffset = 
processFailedRecord(topicIdPartition.topicPartition(), rve.invalidException)

Review Comment:
   Could we change `processFailedRecord()` to take topicIdPartition instead of 
topicPartition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to