gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501944185



##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   // completed transactions whose markers are at offsets above the high 
watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, 
SnapshotFile] = {
+    val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+    for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {
+      tm.put(f.offset, f)
+    }
+    tm
+  }
+
+  /**
+   * Scans the log directory, gathering all producer state snapshot files. 
Snapshot files which do not have an offset
+   * corresponding to one of the provided offsets in segmentBaseOffsets will 
be removed, except in the case that there
+   * is a snapshot file at a higher offset than any offset in 
segmentBaseOffsets.
+   *
+   * The goal here is to remove any snapshot files which do not have an 
associated segment file, but not to remove
+   */
+  private[log] def removeStraySnapshots(segmentBaseOffsets: Set[Long]): Unit = 
{
+    var latestStraySnapshot: Option[SnapshotFile] = None
+    val ss = loadSnapshots()
+    for (snapshot <- ss.values().asScala) {
+      val key = snapshot.offset
+      latestStraySnapshot match {
+        case Some(prev) =>
+          if (!segmentBaseOffsets.contains(key)) {
+            // this snapshot is now the largest stray snapshot.
+            prev.deleteIfExists()
+            ss.remove(prev.offset)
+            latestStraySnapshot = Some(snapshot)
+          }
+        case None =>
+          if (!segmentBaseOffsets.contains(key)) {
+            latestStraySnapshot = Some(snapshot)

Review comment:
       We perform a check below which may cover this case. After setting the 
`snapshots` map, we look at the latest snapshot in the map. If the latest 
snapshot in the map is not equal to the `latestStraySnapshot`, we delete the 
`latestStraySnapshot`. 
   
   I think this is a bit confusing though, so it might be better if instead we 
directly check that the `latestStraySnapshot` is larger than the largest offset 
in `segmentBaseOffsets`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to