kowshik commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r759942807
##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1498,25 +1498,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
producerStateManager.takeSnapshot()
updateHighWatermarkWithLogEndOffset()
// Schedule an asynchronous flush of the old segment
- scheduler.schedule("flush-log", () => flush(newSegment.baseOffset))
+ scheduler.schedule("flush-log", () =>
flushUptoOffsetExclusive(newSegment.baseOffset))
newSegment
}
/**
* Flush all local log segments
*
- * @param inclusive should be true during a clean shutdown, and false
otherwise. The reason is that
+ * @param forceFlushActiveSegment should be true during a clean shutdown,
and false otherwise. The reason is that
* we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long):
Unit` function to flush empty
* active segments, which is important to make sure we don't lose the empty
index file during shutdown.
Review comment:
IIUC the main reason we would like to force flush the active segment is
slightly different.
Instead of `which is important to make sure we don't lose the empty index
file during shutdown.`, could the comment say `which is important to make sure
we persist the active segment file during shutdown, particularly when its
empty.` ?
##########
File path: core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
##########
@@ -1630,6 +1630,20 @@ class UnifiedLogTest {
assertThrows(classOf[OffsetOutOfRangeException], () =>
LogTestUtils.readLog(log, 1026, 1000))
}
+ @Test
+ def testFlushingEmptyActiveSegments(): Unit = {
Review comment:
Sounds good.
##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1498,25 +1498,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
producerStateManager.takeSnapshot()
updateHighWatermarkWithLogEndOffset()
// Schedule an asynchronous flush of the old segment
- scheduler.schedule("flush-log", () => flush(newSegment.baseOffset))
+ scheduler.schedule("flush-log", () =>
flushUptoOffsetExclusive(newSegment.baseOffset))
newSegment
}
/**
* Flush all local log segments
*
- * @param inclusive should be true during a clean shutdown, and false
otherwise. The reason is that
+ * @param forceFlushActiveSegment should be true during a clean shutdown,
and false otherwise. The reason is that
* we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long):
Unit` function to flush empty
Review comment:
s/logEngOffset/logEndOffset
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]