hachikuji commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436166743



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file 
system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +
+            "This could happen if segment files were deleted from the file 
system.")
+          removeAndDeleteSegments(logSegments, asyncDelete = false)
+          leaderEpochCache.foreach(_.clearAndFlush())
+          producerStateManager.truncate()

Review comment:
       Hmm, not sure about this. After KIP-360, we try to retain producer state 
as long as possible even when the corresponding entries have been removed from 
the log. However, we're in a strange state given that some of the later 
segments were apparently removed. Perhaps it is safer to treat this more like a 
new replica which is starting from scratch.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file 
system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {

Review comment:
       nit: this is a big initializer. Are there parts we could move to a 
method?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset

Review comment:
       Can you help me understand what was wrong with this? 

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset
-    recoveryPoint
+    // Update the recovery point if there was a clean shutdown and did not 
perform any changes to
+    // the segment. Otherwise, we just ensure that the recovery point is not 
ahead of the log end
+    // offset. To ensure correctness and to make it easier to reason about, 
it's best to only advance
+    // the recovery point in flush(Long).
+    if (hasCleanShutdownFile)
+      logEndOffsetOption.foreach(recoveryPoint = _)
+    else
+      recoveryPoint = Math.min(recoveryPoint, logEndOffset)

Review comment:
       Hmm, `logEndOffset` is defined by `nextOffsetMetadata`, which is 
initialized after `loadSegments` returns. But `recoverLog` is called within 
`loadSegments`. So does this check work as expected or am I missing something?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -360,7 +360,7 @@ class LogManager(logDirs: Seq[File],
       for ((cleanShutdownFile, dirJobs) <- jobs) {
         dirJobs.foreach(_.get)
         try {
-          cleanShutdownFile.delete()
+          Files.deleteIfExists(cleanShutdownFile.toPath)

Review comment:
       Just checking, but the issue here is that we might mistakenly mark the 
directory is offline if the clean shutdown file did not exist?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -806,14 +806,20 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    if (logSegments.nonEmpty) {
-      val logEndOffset = activeSegment.readNextOffset
-      if (logEndOffset < logStartOffset) {
-        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +
-          "This could happen if segment files were deleted from the file 
system.")
-        removeAndDeleteSegments(logSegments, asyncDelete = true)
-      }
-    }
+    val logEndOffsetOption: Option[Long] =
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +

Review comment:
       I guess it is because of the semantics of DeleteRecords that we trust 
the checkpoint over the segment data. Might be worth a comment about that since 
it is a bit surprising.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to