dajac commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1872935017
########## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ########## @@ -165,4 +166,21 @@ class CoordinatorPartitionWriter( // Required offset. partitionResult.lastOffset + 1 } + + override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): Unit = { + var deleteResults: Map[TopicPartition, DeleteRecordsPartitionResult] = Map.empty + replicaManager.deleteRecords( + timeout = 0L, Review Comment: You cannot compare with append records in this case because they don't work the sam way. In append records, the key is that we use `requiredAcks = 1` which means that the operation does not go to the purgatory at all. In the delete record case, it goes to the purgatory in all cases. With the timeout, I believe that the following code will actually throw a `IllegalStateException` because the operation in the purgatory won't be completed when you reach it. ``` val partitionResult = deleteResults.getOrElse(tp, throw new IllegalStateException(s"Delete status $deleteResults should have partition $tp.")) ``` Could you please double check? One option may be to make delete records return a future and complete it when the callback returns. This would also allow you to not trigger the next purge while the current one is not completed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org