dajac commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1872935017


##########
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##########
@@ -165,4 +166,21 @@ class CoordinatorPartitionWriter(
     // Required offset.
     partitionResult.lastOffset + 1
   }
+
+  override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): 
Unit = {
+    var deleteResults: Map[TopicPartition, DeleteRecordsPartitionResult] = 
Map.empty
+    replicaManager.deleteRecords(
+      timeout = 0L,

Review Comment:
   You cannot compare with append records in this case because they don't work 
the sam way. In append records, the key is that we use `requiredAcks = 1` which 
means that the operation does not go to the purgatory at all. In the delete 
record case, it goes to the purgatory in all cases.
   
   With the timeout, I believe that the following code will actually throw a 
`IllegalStateException` because the operation in the purgatory won't be 
completed when you reach it.
   
   ```
       val partitionResult = deleteResults.getOrElse(tp,
         throw new IllegalStateException(s"Delete status $deleteResults should 
have partition $tp."))
   ```
   
   Could you please double check?
   
   One option may be to make delete records return a future and complete it 
when the callback returns. This would also allow you to not trigger the next 
purge while the current one is not completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to