junrao commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r440539291



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -595,8 +595,10 @@ private[log] class Cleaner(val id: Int,
       log.replaceSegments(List(cleaned), segments)
     } catch {
       case e: LogCleaningAbortedException =>
-        try cleaned.deleteIfExists()
-        catch {
+        try {
+          cleaned.deleteIfExists()
+          log.producerStateManager.deleteIfExists(cleaned.baseOffset)

Review comment:
       Hmm, the cleaned segment has the same base offset as the first segment. 
So, we don't want to delete that snapshot file.

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -751,6 +751,25 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
       None
   }
 
-  private def listSnapshotFiles: Seq[File] = 
ProducerStateManager.listSnapshotFiles(logDir)
+  private[log] def listSnapshotFiles: Seq[File] = 
ProducerStateManager.listSnapshotFiles(logDir)
 
+  /**
+   * Remove any producer state snapshot files which do not have a 
corresponding offset provided
+   * in keepOffsets. The latest snapshot file will always be kept.

Review comment:
       What's keepOffsets?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2421,6 +2396,7 @@ class Log(@volatile private var _dir: File,
         newSegments.foreach { splitSegment =>
           splitSegment.close()
           splitSegment.deleteIfExists()
+          producerStateManager.deleteIfExists(splitSegment.baseOffset)

Review comment:
       It doesn't seem that we generate producer snapshot files for those new 
segments?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2237,7 +2209,10 @@ class Log(@volatile private var _dir: File,
     def deleteSegments(): Unit = {
       info(s"Deleting segments ${segments.mkString(",")}")
       maybeHandleIOException(s"Error while deleting segments for 
$topicPartition in dir ${dir.getParent}") {
-        segments.foreach(_.deleteIfExists())
+        segments.foreach { segment =>
+          segment.deleteIfExists()
+          producerStateManager.deleteIfExists(segment.baseOffset)

Review comment:
       Hmm, this can be a bit tricky. When we replace old segments with a new 
segment in LogCleaner, each of the old segment will be deleted. However, the 
first old segment has the same offset as the new segment. So, we don't want to 
just delete the producer snapshot corresponding to the first old segment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to