junrao commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1757542203


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -846,17 +852,16 @@ class ReplicaManager(val config: KafkaConfig,
                 s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
               case _ => None
             }
-          topicPartition -> LogAppendResult(
+          new TopicIdPartition(topicIds.getOrElse(topicPartition.topic(), 
Uuid.ZERO_UUID), topicPartition) -> LogAppendResult(

Review Comment:
   Hmm, if we don't set topicId, we can't use it to correctly filter 
entriesPerPartition later, right?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -806,20 +811,21 @@ class ReplicaManager(val config: KafkaConfig,
                           requiredAcks: Short,
                           internalTopicsAllowed: Boolean,
                           transactionalId: String,
-                          entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
-                          responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
-                          recordValidationStatsCallback: Map[TopicPartition, 
RecordValidationStats] => Unit = _ => (),
+                          entriesPerPartition: Map[TopicIdPartition, 
MemoryRecords],
+                          responseCallback: Map[TopicIdPartition, 
PartitionResponse] => Unit,
+                          recordValidationStatsCallback: Map[TopicIdPartition, 
RecordValidationStats] => Unit = _ => (),
                           requestLocal: RequestLocal = RequestLocal.NoCaching,
                           actionQueue: ActionQueue = this.defaultActionQueue,
                           transactionSupportedOperation: 
TransactionSupportedOperation): Unit = {
 
     val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
     val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
+    val topicIds = entriesPerPartition.keys.map(tp => tp.topic() -> 
tp.topicId()).toMap
     entriesPerPartition.forKeyValue { (topicPartition, records) =>
       // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
       val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
       transactionalBatches.foreach(batch => 
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
-      if (transactionalBatches.nonEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
+      if (transactionalBatches.nonEmpty) 
topicPartitionBatchInfo.put(topicPartition.topicPartition(), 
records.firstBatch.baseSequence)

Review Comment:
   Perhaps rename topicPartition to topicIdPartition?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -892,11 +901,22 @@ private void sendProduceRequest(long now, int 
destination, short acks, int timeo
             // which is supporting the new magic version to one which doesn't, 
then we will need to convert.
             if (!records.hasMatchingMagic(minUsedMagic))
                 records = batch.records().downConvert(minUsedMagic, 0, 
time).records();
-            ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
+            ProduceRequestData.TopicProduceData tpData = canUseTopicId ?
+                    tpd.find(tp.topic(), topicIds.get(tp.topic())) :
+                    tpd.find(new 
ProduceRequestData.TopicProduceData().setName(tp.topic()));
+
             if (tpData == null) {
-                tpData = new 
ProduceRequestData.TopicProduceData().setName(tp.topic());
+                tpData = new ProduceRequestData.TopicProduceData();
+
+                if (canUseTopicId) {
+                    tpData.setTopicId(topicIds.get(tp.topic()));

Review Comment:
   If canUseTopicId is true, we call `find()` with a non-empty topic name and 
topicId. Here, we add an entry with just the topicId. It seems that we should 
use the same key between the `find()` call and adding the entry? 



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##########
@@ -33,6 +33,7 @@
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+

Review Comment:
   extra new line



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1402,9 +1407,9 @@ class ReplicaManager(val config: KafkaConfig,
           hasCustomErrorMessage = false))
       } else {
         try {
-          val partition = getPartitionOrException(topicPartition)
+          val partition = 
getPartitionOrException(topicPartition.topicPartition())

Review Comment:
   Perhaps rename topicPartition to topicIdPartition?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -923,6 +943,15 @@ private void sendProduceRequest(long now, int destination, 
short acks, int timeo
         log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
     }
 
+    private Map<String, Uuid> getTopicIdsFromBatches(List<ProducerBatch> 
batches) {

Review Comment:
   The topic id is not really from the batches. Perhaps rename to sth like 
`getTopicIdsForBatches` ?



##########
core/src/main/scala/kafka/server/DelayedProduce.scala:
##########
@@ -94,7 +94,7 @@ class DelayedProduce(delayMs: Long,
       trace(s"Checking produce satisfaction for $topicPartition, current 
status $status")
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
-        val (hasEnough, error) = 
replicaManager.getPartitionOrError(topicPartition) match {
+        val (hasEnough, error) = 
replicaManager.getPartitionOrError(topicPartition.topicPartition()) match {

Review Comment:
   Perhaps rename topicPartition to topicIdPartition? Ditto in `onExpiration()`.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -739,7 +754,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         transactionalId = produceRequest.transactionalId,
         entriesPerPartition = authorizedRequestInfo,
         responseCallback = sendResponseCallback,
-        recordValidationStatsCallback = processingStatsCallback,
+        recordValidationStatsCallback = result => 
processingStatsCallback(result.map {
+          case (partition, response) => (partition.topicPartition(), response)

Review Comment:
   This adds a bit more complexity. Could we just change 
processingStatsCallback to take TopicIdPartition instead?



##########
clients/src/main/java/org/apache/kafka/clients/ApiVersions.java:
##########
@@ -34,15 +34,27 @@ public class ApiVersions {
 
     private final Map<String, NodeApiVersions> nodeApiVersions = new 
HashMap<>();
     private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
+    private short maxSupportedProduceVersion = ApiKeys.PRODUCE.latestVersion();
 
     public synchronized void update(String nodeId, NodeApiVersions 
nodeApiVersions) {
         this.nodeApiVersions.put(nodeId, nodeApiVersions);
         this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
+        this.maxSupportedProduceVersion = computeMaxSupportedProduceVersion();
+    }
+
+    private short computeMaxSupportedProduceVersion() {

Review Comment:
   There is a bit of chicken-and-egg problem. nodeApiVersions is only populated 
after the client receives the initial ApiVersionResponse during connection 
creation. However, when the sender generates a produce request, it's possible 
that the connection to the leader hasn't been established yet. So the sender 
doesn't know the full nodeApiVersions yet. 
   
   We already have the following generic way of selecting the version of a 
request in NetworkClient. Perhaps that's enough.
   
   ```
               NodeApiVersions versionInfo = apiVersions.get(nodeId);
               short version;
               // Note: if versionInfo is null, we have no server version 
information. This would be
               // the case when sending the initial ApiVersionRequest which 
fetches the version
               // information itself.  It is also the case when 
discoverBrokerVersions is set to false.
               if (versionInfo == null) {
                   version = builder.latestAllowedVersion();
                   if (discoverBrokerVersions && log.isTraceEnabled())
                       log.trace("No version information found when sending {} 
with correlation id {} to node {}. " +
                               "Assuming version {}.", clientRequest.apiKey(), 
clientRequest.correlationId(), nodeId, version);
               } else {
                   version = 
versionInfo.latestUsableVersion(clientRequest.apiKey(), 
builder.oldestAllowedVersion(),
                           builder.latestAllowedVersion());
               }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to