chia7712 commented on code in PR #19793: URL: https://github.com/apache/kafka/pull/19793#discussion_r2128531545
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -909,12 +911,33 @@ class ReplicaManager(val config: KafkaConfig, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], initialAppendResults: Map[TopicIdPartition, LogAppendResult], initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus], - responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, + responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit, ): Unit = { if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) { // create delayed produce operation - val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus) - val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback) + val produceMetadata = new ProduceMetadata(requiredAcks, initialProduceStatus.asJava) + + // Updates the status of a produce partition based on the current state. + // Please refer to the documentation in `DelayedProduce#tryComplete` for + // a comprehensive description of Case A, Case B and Case C. + def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit = { Review Comment: It would be better to delegate the following code only. ``` replicaManager.getPartitionOrError(topicIdPartition.topicPartition()) match { case Left(err) => // Case A (false, err) case Right(partition) => partition.checkEnoughReplicasReachOffset(status.requiredOffset) } ``` Otherwise, it is hard to list all cases in one place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org