chia7712 commented on code in PR #19226: URL: https://github.com/apache/kafka/pull/19226#discussion_r2021394671
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1282,8 +1281,31 @@ class ReplicaManager(val config: KafkaConfig, } if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) { + def onAcks(topicPartition: TopicPartition, status: DeleteRecordsPartitionStatus): Unit = { + val (lowWatermarkReached, error, lw) = getPartition(topicPartition) match { + case HostedPartition.Online(partition) => + partition.leaderLogIfLocal match { + case Some(_) => + val leaderLW = partition.lowWatermarkIfLeader + (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW) + case None => + (false, Errors.NOT_LEADER_OR_FOLLOWER, DeleteRecordsResponse.INVALID_LOW_WATERMARK) + } + + case HostedPartition.Offline(_) => + (false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK) + + case HostedPartition.None => + (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK) + } + if (error != Errors.NONE || lowWatermarkReached) { + status.setAcksPending(false) + status.responseStatus.setErrorCode(error.code) + status.responseStatus.setLowWatermark(lw) + } + } // create delayed delete records operation - val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback) + val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus.asJava, onAcks , response => responseCallback(response.asScala)) Review Comment: this is not addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org