junrao commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1757542203
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -846,17 +852,16 @@ class ReplicaManager(val config: KafkaConfig, s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")) case _ => None } - topicPartition -> LogAppendResult( + new TopicIdPartition(topicIds.getOrElse(topicPartition.topic(), Uuid.ZERO_UUID), topicPartition) -> LogAppendResult( Review Comment: Hmm, if we don't set topicId, we can't use it to correctly filter entriesPerPartition later, right? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -806,20 +811,21 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short, internalTopicsAllowed: Boolean, transactionalId: String, - entriesPerPartition: Map[TopicPartition, MemoryRecords], - responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), + entriesPerPartition: Map[TopicIdPartition, MemoryRecords], + responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, + recordValidationStatsCallback: Map[TopicIdPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.NoCaching, actionQueue: ActionQueue = this.defaultActionQueue, transactionSupportedOperation: TransactionSupportedOperation): Unit = { val transactionalProducerInfo = mutable.HashSet[(Long, Short)]() val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]() + val topicIds = entriesPerPartition.keys.map(tp => tp.topic() -> tp.topicId()).toMap entriesPerPartition.forKeyValue { (topicPartition, records) => // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch)) - if (transactionalBatches.nonEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence) + if (transactionalBatches.nonEmpty) topicPartitionBatchInfo.put(topicPartition.topicPartition(), records.firstBatch.baseSequence) Review Comment: Perhaps rename topicPartition to topicIdPartition? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -892,11 +901,22 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo // which is supporting the new magic version to one which doesn't, then we will need to convert. if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); - ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic()); + ProduceRequestData.TopicProduceData tpData = canUseTopicId ? + tpd.find(tp.topic(), topicIds.get(tp.topic())) : + tpd.find(new ProduceRequestData.TopicProduceData().setName(tp.topic())); + if (tpData == null) { - tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic()); + tpData = new ProduceRequestData.TopicProduceData(); + + if (canUseTopicId) { + tpData.setTopicId(topicIds.get(tp.topic())); Review Comment: If canUseTopicId is true, we call `find()` with a non-empty topic name and topicId. Here, we add an entry with just the topicId. It seems that we should use the same key between the `find()` call and adding the entry? ########## clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java: ########## @@ -33,6 +33,7 @@ import java.util.Objects; import java.util.stream.Collectors; + Review Comment: extra new line ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1402,9 +1407,9 @@ class ReplicaManager(val config: KafkaConfig, hasCustomErrorMessage = false)) } else { try { - val partition = getPartitionOrException(topicPartition) + val partition = getPartitionOrException(topicPartition.topicPartition()) Review Comment: Perhaps rename topicPartition to topicIdPartition? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -923,6 +943,15 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); } + private Map<String, Uuid> getTopicIdsFromBatches(List<ProducerBatch> batches) { Review Comment: The topic id is not really from the batches. Perhaps rename to sth like `getTopicIdsForBatches` ? ########## core/src/main/scala/kafka/server/DelayedProduce.scala: ########## @@ -94,7 +94,7 @@ class DelayedProduce(delayMs: Long, trace(s"Checking produce satisfaction for $topicPartition, current status $status") // skip those partitions that have already been satisfied if (status.acksPending) { - val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition) match { + val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition.topicPartition()) match { Review Comment: Perhaps rename topicPartition to topicIdPartition? Ditto in `onExpiration()`. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -739,7 +754,9 @@ class KafkaApis(val requestChannel: RequestChannel, transactionalId = produceRequest.transactionalId, entriesPerPartition = authorizedRequestInfo, responseCallback = sendResponseCallback, - recordValidationStatsCallback = processingStatsCallback, + recordValidationStatsCallback = result => processingStatsCallback(result.map { + case (partition, response) => (partition.topicPartition(), response) Review Comment: This adds a bit more complexity. Could we just change processingStatsCallback to take TopicIdPartition instead? ########## clients/src/main/java/org/apache/kafka/clients/ApiVersions.java: ########## @@ -34,15 +34,27 @@ public class ApiVersions { private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>(); private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE; + private short maxSupportedProduceVersion = ApiKeys.PRODUCE.latestVersion(); public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions) { this.nodeApiVersions.put(nodeId, nodeApiVersions); this.maxUsableProduceMagic = computeMaxUsableProduceMagic(); + this.maxSupportedProduceVersion = computeMaxSupportedProduceVersion(); + } + + private short computeMaxSupportedProduceVersion() { Review Comment: There is a bit of chicken-and-egg problem. nodeApiVersions is only populated after the client receives the initial ApiVersionResponse during connection creation. However, when the sender generates a produce request, it's possible that the connection to the leader hasn't been established yet. So the sender doesn't know the full nodeApiVersions yet. We already have the following generic way of selecting the version of a request in NetworkClient. Perhaps that's enough. ``` NodeApiVersions versionInfo = apiVersions.get(nodeId); short version; // Note: if versionInfo is null, we have no server version information. This would be // the case when sending the initial ApiVersionRequest which fetches the version // information itself. It is also the case when discoverBrokerVersions is set to false. if (versionInfo == null) { version = builder.latestAllowedVersion(); if (discoverBrokerVersions && log.isTraceEnabled()) log.trace("No version information found when sending {} with correlation id {} to node {}. " + "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version); } else { version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org