junrao commented on a change in pull request #11199:
URL: https://github.com/apache/kafka/pull/11199#discussion_r687993862
##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -595,8 +595,8 @@ private[log] object LogCleanerManager extends Logging {
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq(
- // we do not clean beyond the first unstable offset
- log.firstUnstableOffset,
+ // we do not clean beyond the last stable offset
Review comment:
This is an existing issue. But could we update the comment in line 593
to include last stable offset too?
##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -541,6 +541,29 @@ class LogCleanerManagerTest extends Logging {
while(log.numberOfSegments < 8)
log.appendAsLeader(records(log.logEndOffset.toInt,
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
+ log.updateHighWatermark(50)
+
+ val lastCleanOffset = Some(0L)
+ val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
lastCleanOffset, time.milliseconds)
+ assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable
offset starts at the beginning of the log.")
+ assertEquals(log.highWatermark,
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is
bounded by the hwm.")
Review comment:
Since the description of the test says bounded by LSO, should we change
log.highWatermark to log.lastStableOffset and the error message accordingly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]