showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486747077



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,30 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, 
Long)]): Unit = {
+  /**
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       OK

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer 
exist
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {

Review comment:
       Good suggestion! Updated.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer 
exist
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {
+                case Some(topicPartion) =>
+                  checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartion
+                case None =>
+                  info(s"Nothing added or removed for 
${dataDir.getAbsoluteFile} directory in updateCheckpoints.")

Review comment:
       Removed. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) 
match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for 
'${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the 
source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter 
`partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: 
`updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = 
Some(topicPartition))`. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) 
match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for 
'${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the 
source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter 
`partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: 
`updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = 
Some(topicPartition))`, and other places as well. Thanks.

##########
File path: 
core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
##########
@@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile {
 }
 
 /**
-  * This class persists a map of (LeaderEpoch => Offsets) to a file (for a 
certain replica)
-  */
+ * This class persists a map of (LeaderEpoch => Offsets) to a file (for a 
certain replica)
+ *
+ * The format in the LeaderEpoch checkpoint file is like this:
+ * -----checkpoint file begin------
+ * 0                <- LeaderEpochCheckpointFile.currentVersion
+ * 2                <- following entries size
+ * 0  1     <- the format is: leader_epoch(int32) end_offset(int64)

Review comment:
       You are right. I referenced the KIP-101 to document it. After your 
reminding, I found the KIP is wrong. In the description, it said it's "Start 
offset", but in the table below, it becomes "end offset". I confirmed this is 
typo. I also updated the KIP as well. Thank you.
   
   
   
![image](https://user-images.githubusercontent.com/43372967/92851603-5aef8980-f420-11ea-9704-a54e297c6cc2.png)
   
   

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer 
exist
+   * Update checkpoint file to remove topics and partitions that no longer 
exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, 
topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: 
Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: 
Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and 
logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is 
still there
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < 
the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file 
will happen only when the provided offset smaller than the one the the 
checkpoint file. So the comment is correct. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and 
logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is 
still there
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < 
the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file 
will happen only when the provided offset smaller than the one the the 
checkpoint file. So the comment is correct. I just added an equal sign (<=) to 
make it more accurate. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,30 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, 
Long)]): Unit = {
+  /**
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       OK

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer 
exist
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {

Review comment:
       Good suggestion! Updated.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer 
exist
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {
+                case Some(topicPartion) =>
+                  checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartion
+                case None =>
+                  info(s"Nothing added or removed for 
${dataDir.getAbsoluteFile} directory in updateCheckpoints.")

Review comment:
       Removed. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) 
match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for 
'${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the 
source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter 
`partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: 
`updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = 
Some(topicPartition))`. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) 
match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for 
'${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the 
source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter 
`partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: 
`updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = 
Some(topicPartition))`, and other places as well. Thanks.

##########
File path: 
core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
##########
@@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile {
 }
 
 /**
-  * This class persists a map of (LeaderEpoch => Offsets) to a file (for a 
certain replica)
-  */
+ * This class persists a map of (LeaderEpoch => Offsets) to a file (for a 
certain replica)
+ *
+ * The format in the LeaderEpoch checkpoint file is like this:
+ * -----checkpoint file begin------
+ * 0                <- LeaderEpochCheckpointFile.currentVersion
+ * 2                <- following entries size
+ * 0  1     <- the format is: leader_epoch(int32) end_offset(int64)

Review comment:
       You are right. I referenced the KIP-101 to document it. After your 
reminding, I found the KIP is wrong. In the description, it said it's "Start 
offset", but in the table below, it becomes "end offset". I confirmed this is 
typo. I also updated the KIP as well. Thank you.
   
   
   
![image](https://user-images.githubusercontent.com/43372967/92851603-5aef8980-f420-11ea-9704-a54e297c6cc2.png)
   
   

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer 
exist
+   * Update checkpoint file to remove topics and partitions that no longer 
exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, 
topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: 
Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: 
Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and 
logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is 
still there
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < 
the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file 
will happen only when the provided offset smaller than the one the the 
checkpoint file. So the comment is correct. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and 
logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is 
still there
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < 
the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file 
will happen only when the provided offset smaller than the one the the 
checkpoint file. So the comment is correct. I just added an equal sign (<=) to 
make it more accurate. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer 
exist
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {

Review comment:
       Good suggestion! Updated.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer 
exist
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {
+                case Some(topicPartion) =>
+                  checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartion
+                case None =>
+                  info(s"Nothing added or removed for 
${dataDir.getAbsoluteFile} directory in updateCheckpoints.")

Review comment:
       Removed. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) 
match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for 
'${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the 
source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter 
`partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: 
`updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = 
Some(topicPartition))`. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) 
match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for 
'${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the 
source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter 
`partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: 
`updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = 
Some(topicPartition))`, and other places as well. Thanks.

##########
File path: 
core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
##########
@@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile {
 }
 
 /**
-  * This class persists a map of (LeaderEpoch => Offsets) to a file (for a 
certain replica)
-  */
+ * This class persists a map of (LeaderEpoch => Offsets) to a file (for a 
certain replica)
+ *
+ * The format in the LeaderEpoch checkpoint file is like this:
+ * -----checkpoint file begin------
+ * 0                <- LeaderEpochCheckpointFile.currentVersion
+ * 2                <- following entries size
+ * 0  1     <- the format is: leader_epoch(int32) end_offset(int64)

Review comment:
       You are right. I referenced the KIP-101 to document it. After your 
reminding, I found the KIP is wrong. In the description, it said it's "Start 
offset", but in the table below, it becomes "end offset". I confirmed this is 
typo. I also updated the KIP as well. Thank you.
   
   
   
![image](https://user-images.githubusercontent.com/43372967/92851603-5aef8980-f420-11ea-9704-a54e297c6cc2.png)
   
   

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer 
exist
+   * Update checkpoint file to remove topics and partitions that no longer 
exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, 
topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: 
Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: 
Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and 
logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is 
still there
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < 
the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file 
will happen only when the provided offset smaller than the one the the 
checkpoint file. So the comment is correct. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and 
logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is 
still there
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < 
the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file 
will happen only when the provided offset smaller than the one the the 
checkpoint file. So the comment is correct. I just added an equal sign (<=) to 
make it more accurate. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer 
exist
+   * Update checkpoint file to remove topics and partitions that no longer 
exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, 
topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer 
exist
+   * Update checkpoint file to remove topics and partitions that no longer 
exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, 
topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: 
Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: 
Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and 
logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is 
still there
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < 
the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file 
will happen only when the provided offset smaller than the one the the 
checkpoint file. So the comment is correct. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+    assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and 
logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is 
still there
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < 
the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file 
will happen only when the provided offset smaller than the one the the 
checkpoint file. So the comment is correct. I just added an equal sign (<=) to 
make it more accurate. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to