junrao commented on code in PR #19216:
URL: https://github.com/apache/kafka/pull/19216#discussion_r2006193900


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java:
##########
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class manages the state of each partition being cleaned.
+ * LogCleaningState defines the cleaning states that a TopicPartition can be 
in.
+ * 1. None                    : No cleaning state in a TopicPartition. In this 
state, it can become LogCleaningInProgress
+ *                              or LogCleaningPaused(1). Valid previous state 
are LogCleaningInProgress and LogCleaningPaused(1)
+ * 2. LogCleaningInProgress   : The cleaning is currently in progress. In this 
state, it can become None when log cleaning is finished
+ *                              or become LogCleaningAborted. Valid previous 
state is None.
+ * 3. LogCleaningAborted      : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+ *                              Valid previous state is LogCleaningInProgress.
+ * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log cleaning 
can be done in this state.
+ *                              In this state, it can become None or 
LogCleaningPaused(2).
+ *                              Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+ * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 2. No 
log cleaning can be done in this state.
+ *                              In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ *                              Valid previous state is LogCleaningPaused(i-1) 
or LogCleaningPaused(i+1).
+ */
+public class LogCleanerManager {
+    public static final String OFFSET_CHECKPOINT_FILE = 
"cleaner-offset-checkpoint";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger("kafka.log.LogCleaner");
+
+    private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = 
"uncleanable-partitions-count";
+    private static final String UNCLEANABLE_BYTES_METRIC_NAME = 
"uncleanable-bytes";
+    private static final String MAX_DIRTY_PERCENT_METRIC_NAME = 
"max-dirty-percent";
+    private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = 
"time-since-last-run-ms";
+
+    // Visible for testing
+    public static final Set<String> GAUGE_METRIC_NAME_NO_TAG = 
Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
+
+    // For compatibility, metrics are defined to be under 
`kafka.log.LogCleanerManager` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleanerManager");
+
+    /* the set of logs currently being cleaned */
+    private final Map<TopicPartition, LogCleaningState> inProgress = new 
HashMap<>();
+
+    /* the set of uncleanable partitions (partitions that have raised an 
unexpected error during cleaning)
+     *   for each log directory */
+    private final Map<String, Set<TopicPartition>> uncleanablePartitions = new 
HashMap<>();
+
+    /* a global lock used to control all access to the in-progress set and the 
offset checkpoints */
+    private final Lock lock = new ReentrantLock();
+
+    /* for coordinating the pausing and the cleaning of a partition */
+    private final Condition pausedCleaningCond = lock.newCondition();
+
+    // Visible for testing
+    private final Map<String, List<Map<String, String>>> 
gaugeMetricNameWithTag = new HashMap<>();
+
+    private final List<File> logDirs;
+    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
+
+    /* the offset checkpoints holding the last cleaned point for each log */
+    private volatile Map<File, OffsetCheckpointFile> checkpoints;
+
+    private volatile double dirtiestLogCleanableRatio;
+    private volatile long timeOfLastRun;
+
+    @SuppressWarnings({"this-escape"})
+    public LogCleanerManager(
+            List<File> logDirs,
+            ConcurrentMap<TopicPartition, UnifiedLog> logs,
+            LogDirFailureChannel logDirFailureChannel
+    ) {
+        this.logDirs = logDirs;
+        this.logs = logs;
+        this.checkpoints = logDirs.stream()
+                .collect(Collectors.toMap(
+                        dir -> dir,
+                        dir -> {
+                            try {
+                                return new OffsetCheckpointFile(new File(dir, 
OFFSET_CHECKPOINT_FILE), logDirFailureChannel);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                ));
+
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        /* gauges for tracking the number of partitions marked as uncleanable 
for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME,
+                    () -> inLock(lock, () -> 
uncleanablePartitions.getOrDefault(dir.getAbsolutePath(), Set.of()).size()),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, 
k -> new ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* gauges for tracking the number of uncleanable bytes from 
uncleanable partitions for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_BYTES_METRIC_NAME,
+                    () -> inLock(lock, () -> {
+                        Set<TopicPartition> partitions = 
uncleanablePartitions.get(dir.getAbsolutePath());
+
+                        if (partitions == null) {
+                            return 0;
+                        } else {
+                            Map<TopicPartition, Long> lastClean = 
allCleanerCheckpoints();
+                            long now = Time.SYSTEM.milliseconds();
+                            return partitions.stream()
+                                    .mapToLong(tp -> {
+                                        UnifiedLog log = logs.get(tp);
+                                        if (log != null) {
+                                            Optional<Long> lastCleanOffset = 
Optional.of(lastClean.get(tp));
+                                            OffsetsToClean offsetsToClean = 
cleanableOffsets(log, lastCleanOffset, now);
+                                            return 
calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset(),
+                                                    
offsetsToClean.firstUncleanableDirtyOffset()).getValue();
+                                        } else {
+                                            return 0L;
+                                        }
+                                    }).sum();
+                        }
+                    }),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_BYTES_METRIC_NAME, k -> new 
ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* a gauge for tracking the cleanable ratio of the dirtiest log */
+        dirtiestLogCleanableRatio = 0.0;
+        metricsGroup.newGauge(MAX_DIRTY_PERCENT_METRIC_NAME, () -> (int) (100 
* dirtiestLogCleanableRatio));
+
+        /* a gauge for tracking the time since the last log cleaner run, in 
milliseconds */
+        timeOfLastRun = Time.SYSTEM.milliseconds();
+        metricsGroup.newGauge(TIME_SINCE_LAST_RUN_MS_METRIC_NAME, () -> 
Time.SYSTEM.milliseconds() - timeOfLastRun);
+    }
+
+    public Map<String, List<Map<String, String>>> gaugeMetricNameWithTag() {
+        return gaugeMetricNameWithTag;
+    }
+
+    /**
+     * @return the position processed for all logs.
+     */
+    public Map<TopicPartition, Long> allCleanerCheckpoints() {
+        return inLock(lock, () -> checkpoints.values().stream()
+                .flatMap(checkpoint -> {
+                    try {
+                        return checkpoint.read().entrySet().stream()
+                                .map(entry -> Map.entry(entry.getKey(), 
entry.getValue()));

Review Comment:
   Could this just be ` return checkpoint.read().entrySet().stream();`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogToClean.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+
+/**
+ * Helper class for a log, its topic/partition, the first cleanable position, 
the first uncleanable dirty position,
+ * and whether it needs compaction immediately.
+ */
+public final class LogToClean implements Comparable<LogToClean> {
+    private final TopicPartition topicPartition;
+    private final UnifiedLog log;
+    private final long firstDirtyOffset;
+    private final long uncleanableOffset;
+    private final boolean needCompactionNow;
+    private final long cleanBytes;
+    private final long firstUncleanableOffset;
+    private final long cleanableBytes;
+    private final long totalBytes;
+    private final double cleanableRatio;
+
+    public LogToClean(
+            TopicPartition topicPartition,
+            UnifiedLog log,
+            long firstDirtyOffset,
+            long uncleanableOffset,
+            boolean needCompactionNow
+    ) {
+        this.topicPartition = Objects.requireNonNull(topicPartition, 
"topicPartition must not be null");
+        this.log = Objects.requireNonNull(log, "log must not be null");
+        this.firstDirtyOffset = firstDirtyOffset;
+        this.uncleanableOffset = uncleanableOffset;
+        this.needCompactionNow = needCompactionNow;
+
+        this.cleanBytes = log.logSegments(-1, firstDirtyOffset).stream()
+                .mapToLong(LogSegment::size)
+                .sum();
+
+        var cleanableBytesResult = 
LogCleanerManager.calculateCleanableBytes(log, firstDirtyOffset, 
uncleanableOffset);
+        this.firstUncleanableOffset = cleanableBytesResult.getKey();
+        this.cleanableBytes = cleanableBytesResult.getValue();
+
+        this.totalBytes = cleanBytes + cleanableBytes;
+        this.cleanableRatio = (double) cleanableBytes / totalBytes;
+    }
+
+    public TopicPartition topicPartition() {
+        return topicPartition;
+    }
+
+    public UnifiedLog log() {
+        return log;
+    }
+
+    public long firstDirtyOffset() {
+        return firstDirtyOffset;
+    }
+
+    public long uncleanableOffset() {

Review Comment:
   This is unused?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java:
##########
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class manages the state of each partition being cleaned.
+ * LogCleaningState defines the cleaning states that a TopicPartition can be 
in.
+ * 1. None                    : No cleaning state in a TopicPartition. In this 
state, it can become LogCleaningInProgress
+ *                              or LogCleaningPaused(1). Valid previous state 
are LogCleaningInProgress and LogCleaningPaused(1)
+ * 2. LogCleaningInProgress   : The cleaning is currently in progress. In this 
state, it can become None when log cleaning is finished
+ *                              or become LogCleaningAborted. Valid previous 
state is None.
+ * 3. LogCleaningAborted      : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+ *                              Valid previous state is LogCleaningInProgress.
+ * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log cleaning 
can be done in this state.
+ *                              In this state, it can become None or 
LogCleaningPaused(2).
+ *                              Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+ * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 2. No 
log cleaning can be done in this state.
+ *                              In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ *                              Valid previous state is LogCleaningPaused(i-1) 
or LogCleaningPaused(i+1).
+ */
+public class LogCleanerManager {
+    public static final String OFFSET_CHECKPOINT_FILE = 
"cleaner-offset-checkpoint";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger("kafka.log.LogCleaner");
+
+    private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = 
"uncleanable-partitions-count";
+    private static final String UNCLEANABLE_BYTES_METRIC_NAME = 
"uncleanable-bytes";
+    private static final String MAX_DIRTY_PERCENT_METRIC_NAME = 
"max-dirty-percent";
+    private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = 
"time-since-last-run-ms";
+
+    // Visible for testing
+    public static final Set<String> GAUGE_METRIC_NAME_NO_TAG = 
Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
+
+    // For compatibility, metrics are defined to be under 
`kafka.log.LogCleanerManager` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleanerManager");
+
+    /* the set of logs currently being cleaned */
+    private final Map<TopicPartition, LogCleaningState> inProgress = new 
HashMap<>();
+
+    /* the set of uncleanable partitions (partitions that have raised an 
unexpected error during cleaning)
+     *   for each log directory */
+    private final Map<String, Set<TopicPartition>> uncleanablePartitions = new 
HashMap<>();
+
+    /* a global lock used to control all access to the in-progress set and the 
offset checkpoints */
+    private final Lock lock = new ReentrantLock();
+
+    /* for coordinating the pausing and the cleaning of a partition */
+    private final Condition pausedCleaningCond = lock.newCondition();
+
+    // Visible for testing
+    private final Map<String, List<Map<String, String>>> 
gaugeMetricNameWithTag = new HashMap<>();
+
+    private final List<File> logDirs;
+    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
+
+    /* the offset checkpoints holding the last cleaned point for each log */
+    private volatile Map<File, OffsetCheckpointFile> checkpoints;
+
+    private volatile double dirtiestLogCleanableRatio;
+    private volatile long timeOfLastRun;
+
+    @SuppressWarnings({"this-escape"})
+    public LogCleanerManager(
+            List<File> logDirs,
+            ConcurrentMap<TopicPartition, UnifiedLog> logs,
+            LogDirFailureChannel logDirFailureChannel
+    ) {
+        this.logDirs = logDirs;
+        this.logs = logs;
+        this.checkpoints = logDirs.stream()
+                .collect(Collectors.toMap(
+                        dir -> dir,
+                        dir -> {
+                            try {
+                                return new OffsetCheckpointFile(new File(dir, 
OFFSET_CHECKPOINT_FILE), logDirFailureChannel);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                ));
+
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        /* gauges for tracking the number of partitions marked as uncleanable 
for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME,
+                    () -> inLock(lock, () -> 
uncleanablePartitions.getOrDefault(dir.getAbsolutePath(), Set.of()).size()),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, 
k -> new ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* gauges for tracking the number of uncleanable bytes from 
uncleanable partitions for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_BYTES_METRIC_NAME,
+                    () -> inLock(lock, () -> {
+                        Set<TopicPartition> partitions = 
uncleanablePartitions.get(dir.getAbsolutePath());
+
+                        if (partitions == null) {
+                            return 0;
+                        } else {
+                            Map<TopicPartition, Long> lastClean = 
allCleanerCheckpoints();
+                            long now = Time.SYSTEM.milliseconds();
+                            return partitions.stream()
+                                    .mapToLong(tp -> {
+                                        UnifiedLog log = logs.get(tp);
+                                        if (log != null) {
+                                            Optional<Long> lastCleanOffset = 
Optional.of(lastClean.get(tp));
+                                            OffsetsToClean offsetsToClean = 
cleanableOffsets(log, lastCleanOffset, now);
+                                            return 
calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset(),
+                                                    
offsetsToClean.firstUncleanableDirtyOffset()).getValue();
+                                        } else {
+                                            return 0L;
+                                        }
+                                    }).sum();
+                        }
+                    }),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_BYTES_METRIC_NAME, k -> new 
ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* a gauge for tracking the cleanable ratio of the dirtiest log */
+        dirtiestLogCleanableRatio = 0.0;
+        metricsGroup.newGauge(MAX_DIRTY_PERCENT_METRIC_NAME, () -> (int) (100 
* dirtiestLogCleanableRatio));
+
+        /* a gauge for tracking the time since the last log cleaner run, in 
milliseconds */
+        timeOfLastRun = Time.SYSTEM.milliseconds();
+        metricsGroup.newGauge(TIME_SINCE_LAST_RUN_MS_METRIC_NAME, () -> 
Time.SYSTEM.milliseconds() - timeOfLastRun);
+    }
+
+    public Map<String, List<Map<String, String>>> gaugeMetricNameWithTag() {
+        return gaugeMetricNameWithTag;
+    }
+
+    /**
+     * @return the position processed for all logs.
+     */
+    public Map<TopicPartition, Long> allCleanerCheckpoints() {
+        return inLock(lock, () -> checkpoints.values().stream()
+                .flatMap(checkpoint -> {
+                    try {
+                        return checkpoint.read().entrySet().stream()
+                                .map(entry -> Map.entry(entry.getKey(), 
entry.getValue()));
+                    } catch (KafkaStorageException e) {
+                        LOG.error("Failed to access checkpoint file {} in dir 
{}",
+                                checkpoint.file().getName(), 
checkpoint.file().getParentFile().getAbsolutePath(), e);
+                        return Stream.empty();
+                    }
+                })
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    }
+
+    /**
+     * Package private for unit test. Get the cleaning state of the partition.
+     */
+    public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
+        return inLock(lock, () -> Optional.ofNullable(inProgress.get(tp)));
+    }
+
+    /**
+     * Package private for unit test. Set the cleaning state of the partition.
+     */
+    public void setCleaningState(TopicPartition tp, LogCleaningState state) {
+        inLock(lock, () -> inProgress.put(tp, state));
+    }
+
+    /**
+     * Choose the log to clean next and add it to the in-progress set. We 
recompute this
+     * each time from the full set of logs to allow logs to be dynamically 
added to the pool of logs
+     * the log manager maintains.
+     */
+    public Optional<LogToClean> grabFilthiestCompactedLog(Time time, 
PreCleanStats preCleanStats) {
+        return inLock(lock, () -> {
+            long now = time.milliseconds();
+            this.timeOfLastRun = now;
+            Map<TopicPartition, Long> lastClean = allCleanerCheckpoints();
+
+            List<LogToClean> dirtyLogs = logs.entrySet().stream()
+                    .filter(entry -> entry.getValue().config().compact)
+                    .filter(entry -> !(inProgress.containsKey(entry.getKey()) 
|| isUncleanablePartition(entry.getValue(), entry.getKey())))
+                    .map(entry -> {
+                                // create a LogToClean instance for each
+                                TopicPartition topicPartition = entry.getKey();
+                                UnifiedLog log = entry.getValue();
+                                try {
+                                    Long lastCleanOffset = 
lastClean.get(topicPartition);
+                                    OffsetsToClean offsetsToClean = 
cleanableOffsets(log, Optional.ofNullable(lastCleanOffset), now);
+                                    // update checkpoint for logs with invalid 
checkpointed offsets
+                                    if (offsetsToClean.forceUpdateCheckpoint) {
+                                        updateCheckpoints(log.parentDirFile(), 
Optional.of(Map.entry(topicPartition, offsetsToClean.firstDirtyOffset)), 
Optional.empty());
+                                    }
+                                    long compactionDelayMs = 
maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now);
+                                    
preCleanStats.updateMaxCompactionDelay(compactionDelayMs);
+
+                                    return new LogToClean(topicPartition, log, 
offsetsToClean.firstDirtyOffset,
+                                            
offsetsToClean.firstUncleanableDirtyOffset, compactionDelayMs > 0);
+                                } catch (Throwable e) {
+                                    throw new LogCleaningException(log, 
"Failed to calculate log cleaning stats for partition " + topicPartition, e);
+                                }
+                            }
+                    ).filter(ltc -> ltc.totalBytes() > 0) // skip any empty 
logs
+                    .toList();
+
+            this.dirtiestLogCleanableRatio = dirtyLogs.isEmpty()
+                    ? 0
+                    : dirtyLogs.stream()
+                    
.max(Comparator.comparingDouble(LogToClean::cleanableRatio))
+                    .map(LogToClean::cleanableRatio)
+                    .orElse(0.0);

Review Comment:
   Is the following a bit better?
   
   ```
   dirtyLogs.stream()
       .mapToDouble(LogToClean::cleanableRatio)
       .max()
       .orElse(0.0);
   ```



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java:
##########
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class manages the state of each partition being cleaned.
+ * LogCleaningState defines the cleaning states that a TopicPartition can be 
in.
+ * 1. None                    : No cleaning state in a TopicPartition. In this 
state, it can become LogCleaningInProgress
+ *                              or LogCleaningPaused(1). Valid previous state 
are LogCleaningInProgress and LogCleaningPaused(1)
+ * 2. LogCleaningInProgress   : The cleaning is currently in progress. In this 
state, it can become None when log cleaning is finished
+ *                              or become LogCleaningAborted. Valid previous 
state is None.
+ * 3. LogCleaningAborted      : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+ *                              Valid previous state is LogCleaningInProgress.
+ * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log cleaning 
can be done in this state.
+ *                              In this state, it can become None or 
LogCleaningPaused(2).
+ *                              Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+ * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 2. No 
log cleaning can be done in this state.
+ *                              In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ *                              Valid previous state is LogCleaningPaused(i-1) 
or LogCleaningPaused(i+1).
+ */
+public class LogCleanerManager {
+    public static final String OFFSET_CHECKPOINT_FILE = 
"cleaner-offset-checkpoint";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger("kafka.log.LogCleaner");
+
+    private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = 
"uncleanable-partitions-count";
+    private static final String UNCLEANABLE_BYTES_METRIC_NAME = 
"uncleanable-bytes";
+    private static final String MAX_DIRTY_PERCENT_METRIC_NAME = 
"max-dirty-percent";
+    private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = 
"time-since-last-run-ms";
+
+    // Visible for testing
+    public static final Set<String> GAUGE_METRIC_NAME_NO_TAG = 
Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
+
+    // For compatibility, metrics are defined to be under 
`kafka.log.LogCleanerManager` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleanerManager");
+
+    /* the set of logs currently being cleaned */
+    private final Map<TopicPartition, LogCleaningState> inProgress = new 
HashMap<>();
+
+    /* the set of uncleanable partitions (partitions that have raised an 
unexpected error during cleaning)
+     *   for each log directory */
+    private final Map<String, Set<TopicPartition>> uncleanablePartitions = new 
HashMap<>();
+
+    /* a global lock used to control all access to the in-progress set and the 
offset checkpoints */
+    private final Lock lock = new ReentrantLock();
+
+    /* for coordinating the pausing and the cleaning of a partition */
+    private final Condition pausedCleaningCond = lock.newCondition();
+
+    // Visible for testing
+    private final Map<String, List<Map<String, String>>> 
gaugeMetricNameWithTag = new HashMap<>();
+
+    private final List<File> logDirs;
+    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
+
+    /* the offset checkpoints holding the last cleaned point for each log */
+    private volatile Map<File, OffsetCheckpointFile> checkpoints;
+
+    private volatile double dirtiestLogCleanableRatio;
+    private volatile long timeOfLastRun;
+
+    @SuppressWarnings({"this-escape"})
+    public LogCleanerManager(
+            List<File> logDirs,
+            ConcurrentMap<TopicPartition, UnifiedLog> logs,
+            LogDirFailureChannel logDirFailureChannel
+    ) {
+        this.logDirs = logDirs;
+        this.logs = logs;
+        this.checkpoints = logDirs.stream()
+                .collect(Collectors.toMap(
+                        dir -> dir,
+                        dir -> {
+                            try {
+                                return new OffsetCheckpointFile(new File(dir, 
OFFSET_CHECKPOINT_FILE), logDirFailureChannel);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                ));
+
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        /* gauges for tracking the number of partitions marked as uncleanable 
for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME,
+                    () -> inLock(lock, () -> 
uncleanablePartitions.getOrDefault(dir.getAbsolutePath(), Set.of()).size()),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, 
k -> new ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* gauges for tracking the number of uncleanable bytes from 
uncleanable partitions for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_BYTES_METRIC_NAME,
+                    () -> inLock(lock, () -> {
+                        Set<TopicPartition> partitions = 
uncleanablePartitions.get(dir.getAbsolutePath());
+
+                        if (partitions == null) {
+                            return 0;
+                        } else {
+                            Map<TopicPartition, Long> lastClean = 
allCleanerCheckpoints();
+                            long now = Time.SYSTEM.milliseconds();
+                            return partitions.stream()
+                                    .mapToLong(tp -> {
+                                        UnifiedLog log = logs.get(tp);
+                                        if (log != null) {
+                                            Optional<Long> lastCleanOffset = 
Optional.of(lastClean.get(tp));
+                                            OffsetsToClean offsetsToClean = 
cleanableOffsets(log, lastCleanOffset, now);
+                                            return 
calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset(),
+                                                    
offsetsToClean.firstUncleanableDirtyOffset()).getValue();
+                                        } else {
+                                            return 0L;
+                                        }
+                                    }).sum();
+                        }
+                    }),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_BYTES_METRIC_NAME, k -> new 
ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* a gauge for tracking the cleanable ratio of the dirtiest log */
+        dirtiestLogCleanableRatio = 0.0;
+        metricsGroup.newGauge(MAX_DIRTY_PERCENT_METRIC_NAME, () -> (int) (100 
* dirtiestLogCleanableRatio));
+
+        /* a gauge for tracking the time since the last log cleaner run, in 
milliseconds */
+        timeOfLastRun = Time.SYSTEM.milliseconds();
+        metricsGroup.newGauge(TIME_SINCE_LAST_RUN_MS_METRIC_NAME, () -> 
Time.SYSTEM.milliseconds() - timeOfLastRun);
+    }
+
+    public Map<String, List<Map<String, String>>> gaugeMetricNameWithTag() {
+        return gaugeMetricNameWithTag;
+    }
+
+    /**
+     * @return the position processed for all logs.
+     */
+    public Map<TopicPartition, Long> allCleanerCheckpoints() {
+        return inLock(lock, () -> checkpoints.values().stream()
+                .flatMap(checkpoint -> {
+                    try {
+                        return checkpoint.read().entrySet().stream()
+                                .map(entry -> Map.entry(entry.getKey(), 
entry.getValue()));
+                    } catch (KafkaStorageException e) {
+                        LOG.error("Failed to access checkpoint file {} in dir 
{}",
+                                checkpoint.file().getName(), 
checkpoint.file().getParentFile().getAbsolutePath(), e);
+                        return Stream.empty();
+                    }
+                })
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    }
+
+    /**
+     * Package private for unit test. Get the cleaning state of the partition.
+     */
+    public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
+        return inLock(lock, () -> Optional.ofNullable(inProgress.get(tp)));
+    }
+
+    /**
+     * Package private for unit test. Set the cleaning state of the partition.
+     */
+    public void setCleaningState(TopicPartition tp, LogCleaningState state) {
+        inLock(lock, () -> inProgress.put(tp, state));
+    }
+
+    /**
+     * Choose the log to clean next and add it to the in-progress set. We 
recompute this
+     * each time from the full set of logs to allow logs to be dynamically 
added to the pool of logs
+     * the log manager maintains.
+     */
+    public Optional<LogToClean> grabFilthiestCompactedLog(Time time, 
PreCleanStats preCleanStats) {
+        return inLock(lock, () -> {
+            long now = time.milliseconds();
+            this.timeOfLastRun = now;

Review Comment:
   We probably want to be consistent with the usage of `this`. Most of the 
time, we don't use `this`.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java:
##########
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class manages the state of each partition being cleaned.
+ * LogCleaningState defines the cleaning states that a TopicPartition can be 
in.
+ * 1. None                    : No cleaning state in a TopicPartition. In this 
state, it can become LogCleaningInProgress
+ *                              or LogCleaningPaused(1). Valid previous state 
are LogCleaningInProgress and LogCleaningPaused(1)
+ * 2. LogCleaningInProgress   : The cleaning is currently in progress. In this 
state, it can become None when log cleaning is finished
+ *                              or become LogCleaningAborted. Valid previous 
state is None.
+ * 3. LogCleaningAborted      : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+ *                              Valid previous state is LogCleaningInProgress.
+ * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log cleaning 
can be done in this state.
+ *                              In this state, it can become None or 
LogCleaningPaused(2).
+ *                              Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+ * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 2. No 
log cleaning can be done in this state.
+ *                              In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ *                              Valid previous state is LogCleaningPaused(i-1) 
or LogCleaningPaused(i+1).
+ */
+public class LogCleanerManager {
+    public static final String OFFSET_CHECKPOINT_FILE = 
"cleaner-offset-checkpoint";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger("kafka.log.LogCleaner");
+
+    private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = 
"uncleanable-partitions-count";
+    private static final String UNCLEANABLE_BYTES_METRIC_NAME = 
"uncleanable-bytes";
+    private static final String MAX_DIRTY_PERCENT_METRIC_NAME = 
"max-dirty-percent";
+    private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = 
"time-since-last-run-ms";
+
+    // Visible for testing
+    public static final Set<String> GAUGE_METRIC_NAME_NO_TAG = 
Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
+
+    // For compatibility, metrics are defined to be under 
`kafka.log.LogCleanerManager` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleanerManager");
+
+    /* the set of logs currently being cleaned */
+    private final Map<TopicPartition, LogCleaningState> inProgress = new 
HashMap<>();
+
+    /* the set of uncleanable partitions (partitions that have raised an 
unexpected error during cleaning)
+     *   for each log directory */
+    private final Map<String, Set<TopicPartition>> uncleanablePartitions = new 
HashMap<>();
+
+    /* a global lock used to control all access to the in-progress set and the 
offset checkpoints */
+    private final Lock lock = new ReentrantLock();
+
+    /* for coordinating the pausing and the cleaning of a partition */
+    private final Condition pausedCleaningCond = lock.newCondition();
+
+    // Visible for testing
+    private final Map<String, List<Map<String, String>>> 
gaugeMetricNameWithTag = new HashMap<>();
+
+    private final List<File> logDirs;
+    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
+
+    /* the offset checkpoints holding the last cleaned point for each log */
+    private volatile Map<File, OffsetCheckpointFile> checkpoints;
+
+    private volatile double dirtiestLogCleanableRatio;
+    private volatile long timeOfLastRun;
+
+    @SuppressWarnings({"this-escape"})
+    public LogCleanerManager(
+            List<File> logDirs,
+            ConcurrentMap<TopicPartition, UnifiedLog> logs,
+            LogDirFailureChannel logDirFailureChannel
+    ) {
+        this.logDirs = logDirs;
+        this.logs = logs;
+        this.checkpoints = logDirs.stream()
+                .collect(Collectors.toMap(
+                        dir -> dir,
+                        dir -> {
+                            try {
+                                return new OffsetCheckpointFile(new File(dir, 
OFFSET_CHECKPOINT_FILE), logDirFailureChannel);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                ));
+
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        /* gauges for tracking the number of partitions marked as uncleanable 
for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME,
+                    () -> inLock(lock, () -> 
uncleanablePartitions.getOrDefault(dir.getAbsolutePath(), Set.of()).size()),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, 
k -> new ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* gauges for tracking the number of uncleanable bytes from 
uncleanable partitions for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_BYTES_METRIC_NAME,
+                    () -> inLock(lock, () -> {
+                        Set<TopicPartition> partitions = 
uncleanablePartitions.get(dir.getAbsolutePath());
+
+                        if (partitions == null) {
+                            return 0;
+                        } else {
+                            Map<TopicPartition, Long> lastClean = 
allCleanerCheckpoints();
+                            long now = Time.SYSTEM.milliseconds();
+                            return partitions.stream()
+                                    .mapToLong(tp -> {
+                                        UnifiedLog log = logs.get(tp);
+                                        if (log != null) {
+                                            Optional<Long> lastCleanOffset = 
Optional.of(lastClean.get(tp));
+                                            OffsetsToClean offsetsToClean = 
cleanableOffsets(log, lastCleanOffset, now);
+                                            return 
calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset(),
+                                                    
offsetsToClean.firstUncleanableDirtyOffset()).getValue();
+                                        } else {
+                                            return 0L;
+                                        }
+                                    }).sum();
+                        }
+                    }),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_BYTES_METRIC_NAME, k -> new 
ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* a gauge for tracking the cleanable ratio of the dirtiest log */
+        dirtiestLogCleanableRatio = 0.0;
+        metricsGroup.newGauge(MAX_DIRTY_PERCENT_METRIC_NAME, () -> (int) (100 
* dirtiestLogCleanableRatio));
+
+        /* a gauge for tracking the time since the last log cleaner run, in 
milliseconds */
+        timeOfLastRun = Time.SYSTEM.milliseconds();
+        metricsGroup.newGauge(TIME_SINCE_LAST_RUN_MS_METRIC_NAME, () -> 
Time.SYSTEM.milliseconds() - timeOfLastRun);
+    }
+
+    public Map<String, List<Map<String, String>>> gaugeMetricNameWithTag() {
+        return gaugeMetricNameWithTag;
+    }
+
+    /**
+     * @return the position processed for all logs.
+     */
+    public Map<TopicPartition, Long> allCleanerCheckpoints() {
+        return inLock(lock, () -> checkpoints.values().stream()
+                .flatMap(checkpoint -> {
+                    try {
+                        return checkpoint.read().entrySet().stream()
+                                .map(entry -> Map.entry(entry.getKey(), 
entry.getValue()));
+                    } catch (KafkaStorageException e) {
+                        LOG.error("Failed to access checkpoint file {} in dir 
{}",
+                                checkpoint.file().getName(), 
checkpoint.file().getParentFile().getAbsolutePath(), e);
+                        return Stream.empty();
+                    }
+                })
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    }
+
+    /**
+     * Package private for unit test. Get the cleaning state of the partition.
+     */
+    public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
+        return inLock(lock, () -> Optional.ofNullable(inProgress.get(tp)));
+    }
+
+    /**
+     * Package private for unit test. Set the cleaning state of the partition.
+     */
+    public void setCleaningState(TopicPartition tp, LogCleaningState state) {
+        inLock(lock, () -> inProgress.put(tp, state));
+    }
+
+    /**
+     * Choose the log to clean next and add it to the in-progress set. We 
recompute this
+     * each time from the full set of logs to allow logs to be dynamically 
added to the pool of logs
+     * the log manager maintains.
+     */
+    public Optional<LogToClean> grabFilthiestCompactedLog(Time time, 
PreCleanStats preCleanStats) {
+        return inLock(lock, () -> {
+            long now = time.milliseconds();
+            this.timeOfLastRun = now;
+            Map<TopicPartition, Long> lastClean = allCleanerCheckpoints();
+
+            List<LogToClean> dirtyLogs = logs.entrySet().stream()
+                    .filter(entry -> entry.getValue().config().compact)
+                    .filter(entry -> !(inProgress.containsKey(entry.getKey()) 
|| isUncleanablePartition(entry.getValue(), entry.getKey())))
+                    .map(entry -> {
+                                // create a LogToClean instance for each
+                                TopicPartition topicPartition = entry.getKey();
+                                UnifiedLog log = entry.getValue();
+                                try {
+                                    Long lastCleanOffset = 
lastClean.get(topicPartition);
+                                    OffsetsToClean offsetsToClean = 
cleanableOffsets(log, Optional.ofNullable(lastCleanOffset), now);
+                                    // update checkpoint for logs with invalid 
checkpointed offsets
+                                    if (offsetsToClean.forceUpdateCheckpoint) {
+                                        updateCheckpoints(log.parentDirFile(), 
Optional.of(Map.entry(topicPartition, offsetsToClean.firstDirtyOffset)), 
Optional.empty());
+                                    }
+                                    long compactionDelayMs = 
maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now);
+                                    
preCleanStats.updateMaxCompactionDelay(compactionDelayMs);
+
+                                    return new LogToClean(topicPartition, log, 
offsetsToClean.firstDirtyOffset,
+                                            
offsetsToClean.firstUncleanableDirtyOffset, compactionDelayMs > 0);
+                                } catch (Throwable e) {
+                                    throw new LogCleaningException(log, 
"Failed to calculate log cleaning stats for partition " + topicPartition, e);
+                                }
+                            }
+                    ).filter(ltc -> ltc.totalBytes() > 0) // skip any empty 
logs
+                    .toList();
+
+            this.dirtiestLogCleanableRatio = dirtyLogs.isEmpty()
+                    ? 0
+                    : dirtyLogs.stream()
+                    
.max(Comparator.comparingDouble(LogToClean::cleanableRatio))
+                    .map(LogToClean::cleanableRatio)
+                    .orElse(0.0);
+            // and must meet the minimum threshold for dirty byte ratio or 
have some bytes required to be compacted
+            List<LogToClean> cleanableLogs = dirtyLogs.stream()
+                    .filter(ltc -> (ltc.needCompactionNow() && 
ltc.cleanableBytes() > 0) || ltc.cleanableRatio() > 
ltc.log().config().minCleanableRatio)
+                    .toList();
+
+            if (cleanableLogs.isEmpty()) {
+                return Optional.empty();
+            } else {
+                preCleanStats.recordCleanablePartitions(cleanableLogs.size());
+                LogToClean filthiest = cleanableLogs.stream()
+                        
.max(Comparator.comparingDouble(LogToClean::cleanableRatio))
+                        .orElseThrow(() -> new IllegalStateException("No 
filthiest log found"));
+
+                inProgress.put(filthiest.topicPartition(), 
LogCleaningState.LogCleaningInProgress.getInstance());
+                return Optional.of(filthiest);
+            }
+        });
+    }
+
+    /**
+     * Pause logs cleaning for logs that do not have compaction enabled
+     * and do not have other deletion or compaction in progress.
+     * This is to handle potential race between retention and cleaner threads 
when users
+     * switch topic configuration between compacted and non-compacted topic.
+     *
+     * @return retention logs that have log cleaning successfully paused
+     */
+    public List<Map.Entry<TopicPartition, UnifiedLog>> 
pauseCleaningForNonCompactedPartitions() {
+        return inLock(lock, () -> {
+            List<Map.Entry<TopicPartition, UnifiedLog>> deletableLogs = 
logs.entrySet().stream()
+                    .filter(entry -> !entry.getValue().config().compact) // 
pick non-compacted logs
+                    .filter(entry -> !inProgress.containsKey(entry.getKey())) 
// skip any logs already in-progress
+                    .collect(Collectors.toList());
+
+            deletableLogs.forEach(entry -> inProgress.put(entry.getKey(), new 
LogCleaningState.LogCleaningPaused(1)));
+
+            return deletableLogs;
+        });
+    }
+
+    /**
+     * Find any logs that have compaction enabled. Mark them as being cleaned
+     * Include logs without delete enabled, as they may have segments
+     * that precede the start offset.
+     */
+    public List<Map.Entry<TopicPartition, UnifiedLog>> deletableLogs() {
+        return inLock(lock, () -> {
+            List<Map.Entry<TopicPartition, UnifiedLog>> toClean = 
logs.entrySet().stream()
+                    .filter(entry -> {
+                        TopicPartition topicPartition = entry.getKey();
+                        UnifiedLog log = entry.getValue();
+                        return !inProgress.containsKey(topicPartition) && 
log.config().compact &&
+                                !isUncleanablePartition(log, topicPartition);
+                    })
+                    .collect(Collectors.toList());
+            toClean.forEach(entry -> inProgress.put(entry.getKey(), 
LogCleaningState.LogCleaningInProgress.getInstance()));
+            return toClean;
+        });
+    }
+
+    /**
+     * Abort the cleaning of a particular partition, if it's in progress. This 
call blocks until the cleaning of
+     * the partition is aborted.
+     * This is implemented by first abortAndPausing and then resuming the 
cleaning of the partition.
+     */
+    public void abortCleaning(TopicPartition topicPartition) {
+        inLock(lock, () -> {
+            abortAndPauseCleaning(topicPartition);
+            resumeCleaning(List.of(topicPartition));
+            return null;
+        });
+    }
+
+    /**
+     * Abort the cleaning of a particular partition if it's in progress, and 
pause any future cleaning of this partition.
+     * This call blocks until the cleaning of the partition is aborted and 
paused.
+     * 1. If the partition is not in progress, mark it as paused.
+     * 2. Otherwise, first mark the state of the partition as aborted.
+     * 3. The cleaner thread checks the state periodically and if it sees the 
state of the partition is aborted, it
+     *    throws a LogCleaningAbortedException to stop the cleaning task.
+     * 4. When the cleaning task is stopped, doneCleaning() is called, which 
sets the state of the partition as paused.
+     * 5. abortAndPauseCleaning() waits until the state of the partition is 
changed to paused.
+     * 6. If the partition is already paused, a new call to this function
+     *    will increase the paused count by one.
+     */
+    public void abortAndPauseCleaning(TopicPartition topicPartition) {
+        inLock(lock, () -> {
+            LogCleaningState state = inProgress.get(topicPartition);
+
+            if (state == null) {
+                inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(1));
+            } else if (state instanceof 
LogCleaningState.LogCleaningInProgress) {
+                inProgress.put(topicPartition, 
LogCleaningState.LogCleaningAborted.getInstance());
+            } else if (state instanceof LogCleaningState.LogCleaningPaused 
logCleaningPaused) {
+                inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(logCleaningPaused.getPausedCount() + 1));
+            } else {
+                throw new IllegalStateException("Compaction for partition " + 
topicPartition +
+                        " cannot be aborted and paused since it is in " + 
state + " state.");
+            }
+
+            while (!isCleaningInStatePaused(topicPartition)) {
+                try {
+                    pausedCleaningCond.await(100, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            return null;
+        });
+    }
+
+    /**
+     * Resume the cleaning of paused partitions.
+     * Each call of this function will undo one pause.
+     */
+    public void resumeCleaning(List<TopicPartition> topicPartitions) {
+        inLock(lock, () -> {
+            topicPartitions.forEach(topicPartition -> {
+                LogCleaningState state = inProgress.get(topicPartition);
+
+                if (state == null) {
+                    throw new IllegalStateException("Compaction for partition 
" + topicPartition + " cannot be resumed since it is not paused.");
+                }
+
+                if (state instanceof LogCleaningState.LogCleaningPaused 
logCleaningPaused) {
+                    if (logCleaningPaused.getPausedCount() == 1) {
+                        inProgress.remove(topicPartition);
+                    } else if (logCleaningPaused.getPausedCount() > 1) {
+                        inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(logCleaningPaused.getPausedCount() - 1));
+                    }
+                } else {
+                    throw new IllegalStateException("Compaction for partition 
" + topicPartition +
+                            " cannot be resumed since it is in " + state + " 
state.");
+                }
+            });
+
+            return null;
+        });
+    }
+
+    /**
+     * Check if the cleaning for a partition is in a particular state. The 
caller is expected to hold lock while making the call.
+     */
+    private boolean isCleaningInState(TopicPartition topicPartition, 
LogCleaningState expectedState) {
+        LogCleaningState state = inProgress.get(topicPartition);
+
+        if (state == null) {
+            return false;
+        } else {
+            return state == expectedState;
+        }
+    }
+
+    /**
+     * Check if the cleaning for a partition is paused. The caller is expected 
to hold lock while making the call.
+     */
+    private boolean isCleaningInStatePaused(TopicPartition topicPartition) {
+        LogCleaningState state = inProgress.get(topicPartition);
+
+        if (state == null) {
+            return false;
+        } else {
+            return state instanceof LogCleaningState.LogCleaningPaused;
+        }
+    }
+
+    /**
+     * Check if the cleaning for a partition is aborted. If so, throw an 
exception.
+     */
+    public void checkCleaningAborted(TopicPartition topicPartition) {
+        inLock(lock, () -> {
+            if (isCleaningInState(topicPartition, 
LogCleaningState.LogCleaningAborted.getInstance())) {
+                throw new LogCleaningAbortedException();
+            }
+            return null;
+        });
+    }
+
+    /**
+     * Update checkpoint file, adding or removing partitions if necessary.
+     *
+     * @param dataDir                The File object to be updated
+     * @param partitionToUpdateOrAdd The [TopicPartition, Long] map data to be 
updated. pass "none" if doing remove, not add
+     * @param partitionToRemove      The TopicPartition to be removed
+     */
+    public void updateCheckpoints(
+            File dataDir,
+            Optional<Map.Entry<TopicPartition, Long>> partitionToUpdateOrAdd,
+            Optional<TopicPartition> partitionToRemove
+    ) {
+        inLock(lock, () -> {
+            OffsetCheckpointFile checkpoint = checkpoints.get(dataDir);
+            if (checkpoint != null) {
+                try {
+                    Map<TopicPartition, Long> currentCheckpoint = 
checkpoint.read().entrySet().stream()
+                            .filter(entry -> logs.containsKey(entry.getKey()))
+                            .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                    // remove the partition offset if any
+                    Map<TopicPartition, Long> updatedCheckpoint = 
partitionToRemove.map(topicPartition -> {
+                        Map<TopicPartition, Long> newCheckpoint = new 
HashMap<>(currentCheckpoint);
+                        newCheckpoint.remove(topicPartition);
+                        return newCheckpoint;
+                    }).orElse(currentCheckpoint);
+
+                    // update or add the partition offset if any
+                    Map<TopicPartition, Long> tempUpdatedCheckpoint = 
updatedCheckpoint;
+                    updatedCheckpoint = partitionToUpdateOrAdd.map(entry -> {
+                        Map<TopicPartition, Long> newCheckpoint = new 
HashMap<>(tempUpdatedCheckpoint);
+                        newCheckpoint.put(entry.getKey(), entry.getValue());
+                        return newCheckpoint;
+                    }).orElse(updatedCheckpoint);
+
+                    checkpoint.write(updatedCheckpoint);
+                } catch (KafkaStorageException e) {
+                    LOG.error("Failed to access checkpoint file {} in dir {}",
+                            checkpoint.file().getName(), 
checkpoint.file().getParentFile().getAbsolutePath(), e);
+                }
+            }
+
+            return null;
+        });
+    }
+
+    /**
+     * alter the checkpoint directory for the topicPartition, to remove the 
data in sourceLogDir, and add the data in destLogDir
+     */
+    public void alterCheckpointDir(TopicPartition topicPartition, File 
sourceLogDir, File destLogDir) {
+        inLock(lock, () -> {
+            try {
+                Optional<Long> offsetOpt = 
Optional.ofNullable(checkpoints.get(sourceLogDir))
+                        .flatMap(checkpoint -> 
Optional.ofNullable(checkpoint.read().get(topicPartition)));
+
+                offsetOpt.ifPresent(offset -> {
+                    LOG.debug("Removing the partition offset data in 
checkpoint file for '{}' from {} directory.",
+                            topicPartition, sourceLogDir.getAbsoluteFile());
+                    updateCheckpoints(sourceLogDir, Optional.empty(), 
Optional.of(topicPartition));
+
+                    LOG.debug("Adding the partition offset data in checkpoint 
file for '{}' to {} directory.",
+                            topicPartition, destLogDir.getAbsoluteFile());
+                    updateCheckpoints(destLogDir, 
Optional.of(Map.entry(topicPartition, offset)), Optional.empty());
+                });
+            } catch (KafkaStorageException e) {
+                LOG.error("Failed to access checkpoint file in dir {}", 
sourceLogDir.getAbsolutePath(), e);
+            }
+
+            Set<TopicPartition> logUncleanablePartitions = 
uncleanablePartitions.getOrDefault(sourceLogDir.toString(), 
Collections.emptySet());
+            if (logUncleanablePartitions.contains(topicPartition)) {
+                logUncleanablePartitions.remove(topicPartition);
+                markPartitionUncleanable(destLogDir.toString(), 
topicPartition);
+            }
+
+            return null;
+        });
+    }
+
+    /**
+     * Stop cleaning logs in the provided directory
+     *
+     * @param dir the absolute path of the log dir
+     */
+    public void handleLogDirFailure(String dir) {
+        LOG.warn("Stopping cleaning logs in dir {}", dir);
+        inLock(lock, () -> {
+            checkpoints = checkpoints.entrySet().stream()
+                    .filter(entry -> 
!entry.getKey().getAbsolutePath().equals(dir))
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+            return null;
+        });
+    }
+
+    /**
+     * Truncate the checkpointed offset for the given partition if its 
checkpointed offset is larger than the given offset
+     */
+    public void maybeTruncateCheckpoint(File dataDir, TopicPartition 
topicPartition, long offset) {
+        inLock(lock, () -> {
+            if (logs.get(topicPartition).config().compact) {
+                OffsetCheckpointFile checkpoint = checkpoints.get(dataDir);
+                if (checkpoint != null) {
+                    Map<TopicPartition, Long> existing = checkpoint.read();
+                    if (existing.getOrDefault(topicPartition, 0L) > offset) {
+                        existing.put(topicPartition, offset);
+                        checkpoint.write(existing);
+                    }
+                }
+            }
+
+            return null;
+        });
+    }
+
+    /**
+     * Save out the endOffset and remove the given log from the in-progress 
set, if not aborted.
+     */
+    public void doneCleaning(TopicPartition topicPartition, File dataDir, long 
endOffset) {
+        inLock(lock, () -> {
+            LogCleaningState state = inProgress.get(topicPartition);
+
+            if (state == null) {
+                throw new IllegalStateException("State for partition " + 
topicPartition + " should exist.");
+            } else if (state instanceof 
LogCleaningState.LogCleaningInProgress) {
+                updateCheckpoints(dataDir, 
Optional.of(Map.entry(topicPartition, endOffset)), Optional.empty());
+                inProgress.remove(topicPartition);
+            } else if (state instanceof LogCleaningState.LogCleaningAborted) {
+                inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(1));
+                pausedCleaningCond.signalAll();
+            } else {
+                throw new IllegalStateException("In-progress partition " + 
topicPartition + " cannot be in " + state + " state.");
+            }
+
+            return null;
+        });
+    }
+
+    public void doneDeleting(List<TopicPartition> topicPartitions) {
+        inLock(lock, () -> {
+            topicPartitions.forEach(topicPartition -> {
+                LogCleaningState logCleaningState = 
inProgress.get(topicPartition);
+
+                if (logCleaningState == null) {
+                    throw new IllegalStateException("State for partition " + 
topicPartition + " should exist.");
+                } else if (logCleaningState == 
LogCleaningState.LogCleaningInProgress.getInstance()) {
+                    inProgress.remove(topicPartition);
+                } else if (logCleaningState == 
LogCleaningState.LogCleaningAborted.getInstance()) {
+                    inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(1));
+                    pausedCleaningCond.signalAll();
+                } else {
+                    throw new IllegalStateException("In-progress partition " + 
topicPartition + " cannot be in " + logCleaningState + " state.");
+                }
+            });
+
+            return null;
+        });
+    }
+
+    /**
+     * Returns an immutable set of the uncleanable partitions for a given log 
directory
+     * Only used for testing
+     */
+    public Set<TopicPartition> uncleanablePartitions(String logDir) {
+        return inLock(lock, () -> uncleanablePartitions.getOrDefault(logDir, 
Set.of()));

Review Comment:
   Here, we are returning a Set that could change. The scala code returns an 
immutable set.



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -99,7 +100,7 @@ import scala.util.control.ControlThrowable
  */
 class LogCleaner(initialConfig: CleanerConfig,
                  val logDirs: Seq[File],
-                 val logs: Pool[TopicPartition, UnifiedLog],
+                 val logs: util.concurrent.ConcurrentMap[TopicPartition, 
UnifiedLog],

Review Comment:
   Could we change the java doc too?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java:
##########
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class manages the state of each partition being cleaned.
+ * LogCleaningState defines the cleaning states that a TopicPartition can be 
in.
+ * 1. None                    : No cleaning state in a TopicPartition. In this 
state, it can become LogCleaningInProgress
+ *                              or LogCleaningPaused(1). Valid previous state 
are LogCleaningInProgress and LogCleaningPaused(1)
+ * 2. LogCleaningInProgress   : The cleaning is currently in progress. In this 
state, it can become None when log cleaning is finished
+ *                              or become LogCleaningAborted. Valid previous 
state is None.
+ * 3. LogCleaningAborted      : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+ *                              Valid previous state is LogCleaningInProgress.
+ * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log cleaning 
can be done in this state.
+ *                              In this state, it can become None or 
LogCleaningPaused(2).
+ *                              Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+ * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 2. No 
log cleaning can be done in this state.
+ *                              In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ *                              Valid previous state is LogCleaningPaused(i-1) 
or LogCleaningPaused(i+1).
+ */
+public class LogCleanerManager {
+    public static final String OFFSET_CHECKPOINT_FILE = 
"cleaner-offset-checkpoint";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger("kafka.log.LogCleaner");
+
+    private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = 
"uncleanable-partitions-count";
+    private static final String UNCLEANABLE_BYTES_METRIC_NAME = 
"uncleanable-bytes";
+    private static final String MAX_DIRTY_PERCENT_METRIC_NAME = 
"max-dirty-percent";
+    private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = 
"time-since-last-run-ms";
+
+    // Visible for testing
+    public static final Set<String> GAUGE_METRIC_NAME_NO_TAG = 
Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
+
+    // For compatibility, metrics are defined to be under 
`kafka.log.LogCleanerManager` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleanerManager");
+
+    /* the set of logs currently being cleaned */
+    private final Map<TopicPartition, LogCleaningState> inProgress = new 
HashMap<>();
+
+    /* the set of uncleanable partitions (partitions that have raised an 
unexpected error during cleaning)
+     *   for each log directory */
+    private final Map<String, Set<TopicPartition>> uncleanablePartitions = new 
HashMap<>();
+
+    /* a global lock used to control all access to the in-progress set and the 
offset checkpoints */
+    private final Lock lock = new ReentrantLock();
+
+    /* for coordinating the pausing and the cleaning of a partition */
+    private final Condition pausedCleaningCond = lock.newCondition();
+
+    // Visible for testing
+    private final Map<String, List<Map<String, String>>> 
gaugeMetricNameWithTag = new HashMap<>();
+
+    private final List<File> logDirs;
+    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
+
+    /* the offset checkpoints holding the last cleaned point for each log */
+    private volatile Map<File, OffsetCheckpointFile> checkpoints;
+
+    private volatile double dirtiestLogCleanableRatio;
+    private volatile long timeOfLastRun;
+
+    @SuppressWarnings({"this-escape"})
+    public LogCleanerManager(
+            List<File> logDirs,
+            ConcurrentMap<TopicPartition, UnifiedLog> logs,
+            LogDirFailureChannel logDirFailureChannel
+    ) {
+        this.logDirs = logDirs;
+        this.logs = logs;
+        this.checkpoints = logDirs.stream()
+                .collect(Collectors.toMap(
+                        dir -> dir,
+                        dir -> {
+                            try {
+                                return new OffsetCheckpointFile(new File(dir, 
OFFSET_CHECKPOINT_FILE), logDirFailureChannel);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                ));
+
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        /* gauges for tracking the number of partitions marked as uncleanable 
for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME,
+                    () -> inLock(lock, () -> 
uncleanablePartitions.getOrDefault(dir.getAbsolutePath(), Set.of()).size()),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, 
k -> new ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* gauges for tracking the number of uncleanable bytes from 
uncleanable partitions for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_BYTES_METRIC_NAME,
+                    () -> inLock(lock, () -> {
+                        Set<TopicPartition> partitions = 
uncleanablePartitions.get(dir.getAbsolutePath());
+
+                        if (partitions == null) {
+                            return 0;
+                        } else {
+                            Map<TopicPartition, Long> lastClean = 
allCleanerCheckpoints();
+                            long now = Time.SYSTEM.milliseconds();
+                            return partitions.stream()
+                                    .mapToLong(tp -> {
+                                        UnifiedLog log = logs.get(tp);
+                                        if (log != null) {
+                                            Optional<Long> lastCleanOffset = 
Optional.of(lastClean.get(tp));
+                                            OffsetsToClean offsetsToClean = 
cleanableOffsets(log, lastCleanOffset, now);
+                                            return 
calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset(),
+                                                    
offsetsToClean.firstUncleanableDirtyOffset()).getValue();
+                                        } else {
+                                            return 0L;
+                                        }
+                                    }).sum();
+                        }
+                    }),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_BYTES_METRIC_NAME, k -> new 
ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* a gauge for tracking the cleanable ratio of the dirtiest log */
+        dirtiestLogCleanableRatio = 0.0;
+        metricsGroup.newGauge(MAX_DIRTY_PERCENT_METRIC_NAME, () -> (int) (100 
* dirtiestLogCleanableRatio));
+
+        /* a gauge for tracking the time since the last log cleaner run, in 
milliseconds */
+        timeOfLastRun = Time.SYSTEM.milliseconds();
+        metricsGroup.newGauge(TIME_SINCE_LAST_RUN_MS_METRIC_NAME, () -> 
Time.SYSTEM.milliseconds() - timeOfLastRun);
+    }
+
+    public Map<String, List<Map<String, String>>> gaugeMetricNameWithTag() {
+        return gaugeMetricNameWithTag;
+    }
+
+    /**
+     * @return the position processed for all logs.
+     */
+    public Map<TopicPartition, Long> allCleanerCheckpoints() {
+        return inLock(lock, () -> checkpoints.values().stream()
+                .flatMap(checkpoint -> {
+                    try {
+                        return checkpoint.read().entrySet().stream()
+                                .map(entry -> Map.entry(entry.getKey(), 
entry.getValue()));
+                    } catch (KafkaStorageException e) {
+                        LOG.error("Failed to access checkpoint file {} in dir 
{}",
+                                checkpoint.file().getName(), 
checkpoint.file().getParentFile().getAbsolutePath(), e);
+                        return Stream.empty();
+                    }
+                })
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    }
+
+    /**
+     * Package private for unit test. Get the cleaning state of the partition.
+     */
+    public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
+        return inLock(lock, () -> Optional.ofNullable(inProgress.get(tp)));
+    }
+
+    /**
+     * Package private for unit test. Set the cleaning state of the partition.
+     */
+    public void setCleaningState(TopicPartition tp, LogCleaningState state) {
+        inLock(lock, () -> inProgress.put(tp, state));
+    }
+
+    /**
+     * Choose the log to clean next and add it to the in-progress set. We 
recompute this
+     * each time from the full set of logs to allow logs to be dynamically 
added to the pool of logs
+     * the log manager maintains.
+     */
+    public Optional<LogToClean> grabFilthiestCompactedLog(Time time, 
PreCleanStats preCleanStats) {
+        return inLock(lock, () -> {
+            long now = time.milliseconds();
+            this.timeOfLastRun = now;
+            Map<TopicPartition, Long> lastClean = allCleanerCheckpoints();
+
+            List<LogToClean> dirtyLogs = logs.entrySet().stream()
+                    .filter(entry -> entry.getValue().config().compact)
+                    .filter(entry -> !(inProgress.containsKey(entry.getKey()) 
|| isUncleanablePartition(entry.getValue(), entry.getKey())))

Review Comment:
   Perhaps the following is clearer?
   
   ```
       .filter(entry -> entry.getValue().config().compact &&
                        !inProgress.containsKey(entry.getKey()) &&
                        !isUncleanablePartition(entry.getValue(), 
entry.getKey()))
   ```



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java:
##########
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class manages the state of each partition being cleaned.
+ * LogCleaningState defines the cleaning states that a TopicPartition can be 
in.
+ * 1. None                    : No cleaning state in a TopicPartition. In this 
state, it can become LogCleaningInProgress
+ *                              or LogCleaningPaused(1). Valid previous state 
are LogCleaningInProgress and LogCleaningPaused(1)
+ * 2. LogCleaningInProgress   : The cleaning is currently in progress. In this 
state, it can become None when log cleaning is finished
+ *                              or become LogCleaningAborted. Valid previous 
state is None.
+ * 3. LogCleaningAborted      : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+ *                              Valid previous state is LogCleaningInProgress.
+ * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log cleaning 
can be done in this state.
+ *                              In this state, it can become None or 
LogCleaningPaused(2).
+ *                              Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+ * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 2. No 
log cleaning can be done in this state.
+ *                              In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ *                              Valid previous state is LogCleaningPaused(i-1) 
or LogCleaningPaused(i+1).
+ */
+public class LogCleanerManager {
+    public static final String OFFSET_CHECKPOINT_FILE = 
"cleaner-offset-checkpoint";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger("kafka.log.LogCleaner");
+
+    private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = 
"uncleanable-partitions-count";
+    private static final String UNCLEANABLE_BYTES_METRIC_NAME = 
"uncleanable-bytes";
+    private static final String MAX_DIRTY_PERCENT_METRIC_NAME = 
"max-dirty-percent";
+    private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = 
"time-since-last-run-ms";
+
+    // Visible for testing
+    public static final Set<String> GAUGE_METRIC_NAME_NO_TAG = 
Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
+
+    // For compatibility, metrics are defined to be under 
`kafka.log.LogCleanerManager` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleanerManager");
+
+    /* the set of logs currently being cleaned */
+    private final Map<TopicPartition, LogCleaningState> inProgress = new 
HashMap<>();
+
+    /* the set of uncleanable partitions (partitions that have raised an 
unexpected error during cleaning)
+     *   for each log directory */
+    private final Map<String, Set<TopicPartition>> uncleanablePartitions = new 
HashMap<>();
+
+    /* a global lock used to control all access to the in-progress set and the 
offset checkpoints */
+    private final Lock lock = new ReentrantLock();
+
+    /* for coordinating the pausing and the cleaning of a partition */
+    private final Condition pausedCleaningCond = lock.newCondition();
+
+    // Visible for testing
+    private final Map<String, List<Map<String, String>>> 
gaugeMetricNameWithTag = new HashMap<>();
+
+    private final List<File> logDirs;
+    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
+
+    /* the offset checkpoints holding the last cleaned point for each log */
+    private volatile Map<File, OffsetCheckpointFile> checkpoints;
+
+    private volatile double dirtiestLogCleanableRatio;
+    private volatile long timeOfLastRun;
+
+    @SuppressWarnings({"this-escape"})
+    public LogCleanerManager(
+            List<File> logDirs,
+            ConcurrentMap<TopicPartition, UnifiedLog> logs,
+            LogDirFailureChannel logDirFailureChannel
+    ) {
+        this.logDirs = logDirs;
+        this.logs = logs;
+        this.checkpoints = logDirs.stream()
+                .collect(Collectors.toMap(
+                        dir -> dir,
+                        dir -> {
+                            try {
+                                return new OffsetCheckpointFile(new File(dir, 
OFFSET_CHECKPOINT_FILE), logDirFailureChannel);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                ));
+
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        /* gauges for tracking the number of partitions marked as uncleanable 
for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME,
+                    () -> inLock(lock, () -> 
uncleanablePartitions.getOrDefault(dir.getAbsolutePath(), Set.of()).size()),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, 
k -> new ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* gauges for tracking the number of uncleanable bytes from 
uncleanable partitions for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_BYTES_METRIC_NAME,
+                    () -> inLock(lock, () -> {
+                        Set<TopicPartition> partitions = 
uncleanablePartitions.get(dir.getAbsolutePath());
+
+                        if (partitions == null) {
+                            return 0;
+                        } else {
+                            Map<TopicPartition, Long> lastClean = 
allCleanerCheckpoints();
+                            long now = Time.SYSTEM.milliseconds();
+                            return partitions.stream()
+                                    .mapToLong(tp -> {
+                                        UnifiedLog log = logs.get(tp);
+                                        if (log != null) {
+                                            Optional<Long> lastCleanOffset = 
Optional.of(lastClean.get(tp));
+                                            OffsetsToClean offsetsToClean = 
cleanableOffsets(log, lastCleanOffset, now);
+                                            return 
calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset(),
+                                                    
offsetsToClean.firstUncleanableDirtyOffset()).getValue();
+                                        } else {
+                                            return 0L;
+                                        }
+                                    }).sum();
+                        }
+                    }),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_BYTES_METRIC_NAME, k -> new 
ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* a gauge for tracking the cleanable ratio of the dirtiest log */
+        dirtiestLogCleanableRatio = 0.0;
+        metricsGroup.newGauge(MAX_DIRTY_PERCENT_METRIC_NAME, () -> (int) (100 
* dirtiestLogCleanableRatio));
+
+        /* a gauge for tracking the time since the last log cleaner run, in 
milliseconds */
+        timeOfLastRun = Time.SYSTEM.milliseconds();
+        metricsGroup.newGauge(TIME_SINCE_LAST_RUN_MS_METRIC_NAME, () -> 
Time.SYSTEM.milliseconds() - timeOfLastRun);
+    }
+
+    public Map<String, List<Map<String, String>>> gaugeMetricNameWithTag() {
+        return gaugeMetricNameWithTag;
+    }
+
+    /**
+     * @return the position processed for all logs.
+     */
+    public Map<TopicPartition, Long> allCleanerCheckpoints() {
+        return inLock(lock, () -> checkpoints.values().stream()
+                .flatMap(checkpoint -> {
+                    try {
+                        return checkpoint.read().entrySet().stream()
+                                .map(entry -> Map.entry(entry.getKey(), 
entry.getValue()));
+                    } catch (KafkaStorageException e) {
+                        LOG.error("Failed to access checkpoint file {} in dir 
{}",
+                                checkpoint.file().getName(), 
checkpoint.file().getParentFile().getAbsolutePath(), e);
+                        return Stream.empty();
+                    }
+                })
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    }
+
+    /**
+     * Package private for unit test. Get the cleaning state of the partition.
+     */
+    public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
+        return inLock(lock, () -> Optional.ofNullable(inProgress.get(tp)));
+    }
+
+    /**
+     * Package private for unit test. Set the cleaning state of the partition.
+     */
+    public void setCleaningState(TopicPartition tp, LogCleaningState state) {
+        inLock(lock, () -> inProgress.put(tp, state));
+    }
+
+    /**
+     * Choose the log to clean next and add it to the in-progress set. We 
recompute this
+     * each time from the full set of logs to allow logs to be dynamically 
added to the pool of logs
+     * the log manager maintains.
+     */
+    public Optional<LogToClean> grabFilthiestCompactedLog(Time time, 
PreCleanStats preCleanStats) {
+        return inLock(lock, () -> {
+            long now = time.milliseconds();
+            this.timeOfLastRun = now;
+            Map<TopicPartition, Long> lastClean = allCleanerCheckpoints();
+
+            List<LogToClean> dirtyLogs = logs.entrySet().stream()
+                    .filter(entry -> entry.getValue().config().compact)
+                    .filter(entry -> !(inProgress.containsKey(entry.getKey()) 
|| isUncleanablePartition(entry.getValue(), entry.getKey())))
+                    .map(entry -> {
+                                // create a LogToClean instance for each
+                                TopicPartition topicPartition = entry.getKey();
+                                UnifiedLog log = entry.getValue();
+                                try {
+                                    Long lastCleanOffset = 
lastClean.get(topicPartition);
+                                    OffsetsToClean offsetsToClean = 
cleanableOffsets(log, Optional.ofNullable(lastCleanOffset), now);
+                                    // update checkpoint for logs with invalid 
checkpointed offsets
+                                    if (offsetsToClean.forceUpdateCheckpoint) {
+                                        updateCheckpoints(log.parentDirFile(), 
Optional.of(Map.entry(topicPartition, offsetsToClean.firstDirtyOffset)), 
Optional.empty());
+                                    }
+                                    long compactionDelayMs = 
maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now);
+                                    
preCleanStats.updateMaxCompactionDelay(compactionDelayMs);
+
+                                    return new LogToClean(topicPartition, log, 
offsetsToClean.firstDirtyOffset,
+                                            
offsetsToClean.firstUncleanableDirtyOffset, compactionDelayMs > 0);
+                                } catch (Throwable e) {
+                                    throw new LogCleaningException(log, 
"Failed to calculate log cleaning stats for partition " + topicPartition, e);
+                                }
+                            }
+                    ).filter(ltc -> ltc.totalBytes() > 0) // skip any empty 
logs
+                    .toList();
+
+            this.dirtiestLogCleanableRatio = dirtyLogs.isEmpty()
+                    ? 0
+                    : dirtyLogs.stream()
+                    
.max(Comparator.comparingDouble(LogToClean::cleanableRatio))
+                    .map(LogToClean::cleanableRatio)
+                    .orElse(0.0);
+            // and must meet the minimum threshold for dirty byte ratio or 
have some bytes required to be compacted
+            List<LogToClean> cleanableLogs = dirtyLogs.stream()
+                    .filter(ltc -> (ltc.needCompactionNow() && 
ltc.cleanableBytes() > 0) || ltc.cleanableRatio() > 
ltc.log().config().minCleanableRatio)
+                    .toList();
+
+            if (cleanableLogs.isEmpty()) {
+                return Optional.empty();
+            } else {
+                preCleanStats.recordCleanablePartitions(cleanableLogs.size());
+                LogToClean filthiest = cleanableLogs.stream()
+                        
.max(Comparator.comparingDouble(LogToClean::cleanableRatio))
+                        .orElseThrow(() -> new IllegalStateException("No 
filthiest log found"));
+
+                inProgress.put(filthiest.topicPartition(), 
LogCleaningState.LogCleaningInProgress.getInstance());
+                return Optional.of(filthiest);
+            }
+        });
+    }
+
+    /**
+     * Pause logs cleaning for logs that do not have compaction enabled
+     * and do not have other deletion or compaction in progress.
+     * This is to handle potential race between retention and cleaner threads 
when users
+     * switch topic configuration between compacted and non-compacted topic.
+     *
+     * @return retention logs that have log cleaning successfully paused
+     */
+    public List<Map.Entry<TopicPartition, UnifiedLog>> 
pauseCleaningForNonCompactedPartitions() {
+        return inLock(lock, () -> {
+            List<Map.Entry<TopicPartition, UnifiedLog>> deletableLogs = 
logs.entrySet().stream()
+                    .filter(entry -> !entry.getValue().config().compact) // 
pick non-compacted logs
+                    .filter(entry -> !inProgress.containsKey(entry.getKey())) 
// skip any logs already in-progress
+                    .collect(Collectors.toList());
+
+            deletableLogs.forEach(entry -> inProgress.put(entry.getKey(), new 
LogCleaningState.LogCleaningPaused(1)));
+
+            return deletableLogs;
+        });
+    }
+
+    /**
+     * Find any logs that have compaction enabled. Mark them as being cleaned
+     * Include logs without delete enabled, as they may have segments
+     * that precede the start offset.
+     */
+    public List<Map.Entry<TopicPartition, UnifiedLog>> deletableLogs() {
+        return inLock(lock, () -> {
+            List<Map.Entry<TopicPartition, UnifiedLog>> toClean = 
logs.entrySet().stream()
+                    .filter(entry -> {
+                        TopicPartition topicPartition = entry.getKey();
+                        UnifiedLog log = entry.getValue();
+                        return !inProgress.containsKey(topicPartition) && 
log.config().compact &&
+                                !isUncleanablePartition(log, topicPartition);
+                    })
+                    .collect(Collectors.toList());
+            toClean.forEach(entry -> inProgress.put(entry.getKey(), 
LogCleaningState.LogCleaningInProgress.getInstance()));
+            return toClean;
+        });
+    }
+
+    /**
+     * Abort the cleaning of a particular partition, if it's in progress. This 
call blocks until the cleaning of
+     * the partition is aborted.
+     * This is implemented by first abortAndPausing and then resuming the 
cleaning of the partition.
+     */
+    public void abortCleaning(TopicPartition topicPartition) {
+        inLock(lock, () -> {
+            abortAndPauseCleaning(topicPartition);
+            resumeCleaning(List.of(topicPartition));
+            return null;
+        });
+    }
+
+    /**
+     * Abort the cleaning of a particular partition if it's in progress, and 
pause any future cleaning of this partition.
+     * This call blocks until the cleaning of the partition is aborted and 
paused.
+     * 1. If the partition is not in progress, mark it as paused.
+     * 2. Otherwise, first mark the state of the partition as aborted.
+     * 3. The cleaner thread checks the state periodically and if it sees the 
state of the partition is aborted, it
+     *    throws a LogCleaningAbortedException to stop the cleaning task.
+     * 4. When the cleaning task is stopped, doneCleaning() is called, which 
sets the state of the partition as paused.
+     * 5. abortAndPauseCleaning() waits until the state of the partition is 
changed to paused.
+     * 6. If the partition is already paused, a new call to this function
+     *    will increase the paused count by one.
+     */
+    public void abortAndPauseCleaning(TopicPartition topicPartition) {
+        inLock(lock, () -> {
+            LogCleaningState state = inProgress.get(topicPartition);
+
+            if (state == null) {
+                inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(1));
+            } else if (state instanceof 
LogCleaningState.LogCleaningInProgress) {
+                inProgress.put(topicPartition, 
LogCleaningState.LogCleaningAborted.getInstance());
+            } else if (state instanceof LogCleaningState.LogCleaningPaused 
logCleaningPaused) {
+                inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(logCleaningPaused.getPausedCount() + 1));
+            } else {
+                throw new IllegalStateException("Compaction for partition " + 
topicPartition +
+                        " cannot be aborted and paused since it is in " + 
state + " state.");
+            }
+
+            while (!isCleaningInStatePaused(topicPartition)) {
+                try {
+                    pausedCleaningCond.await(100, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            return null;
+        });
+    }
+
+    /**
+     * Resume the cleaning of paused partitions.
+     * Each call of this function will undo one pause.
+     */
+    public void resumeCleaning(List<TopicPartition> topicPartitions) {
+        inLock(lock, () -> {
+            topicPartitions.forEach(topicPartition -> {
+                LogCleaningState state = inProgress.get(topicPartition);
+
+                if (state == null) {
+                    throw new IllegalStateException("Compaction for partition 
" + topicPartition + " cannot be resumed since it is not paused.");
+                }
+
+                if (state instanceof LogCleaningState.LogCleaningPaused 
logCleaningPaused) {
+                    if (logCleaningPaused.getPausedCount() == 1) {
+                        inProgress.remove(topicPartition);
+                    } else if (logCleaningPaused.getPausedCount() > 1) {
+                        inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(logCleaningPaused.getPausedCount() - 1));
+                    }
+                } else {
+                    throw new IllegalStateException("Compaction for partition 
" + topicPartition +
+                            " cannot be resumed since it is in " + state + " 
state.");
+                }
+            });
+
+            return null;
+        });
+    }
+
+    /**
+     * Check if the cleaning for a partition is in a particular state. The 
caller is expected to hold lock while making the call.
+     */
+    private boolean isCleaningInState(TopicPartition topicPartition, 
LogCleaningState expectedState) {
+        LogCleaningState state = inProgress.get(topicPartition);
+
+        if (state == null) {
+            return false;
+        } else {
+            return state == expectedState;
+        }
+    }
+
+    /**
+     * Check if the cleaning for a partition is paused. The caller is expected 
to hold lock while making the call.
+     */
+    private boolean isCleaningInStatePaused(TopicPartition topicPartition) {
+        LogCleaningState state = inProgress.get(topicPartition);
+
+        if (state == null) {
+            return false;
+        } else {
+            return state instanceof LogCleaningState.LogCleaningPaused;
+        }
+    }
+
+    /**
+     * Check if the cleaning for a partition is aborted. If so, throw an 
exception.
+     */
+    public void checkCleaningAborted(TopicPartition topicPartition) {
+        inLock(lock, () -> {
+            if (isCleaningInState(topicPartition, 
LogCleaningState.LogCleaningAborted.getInstance())) {
+                throw new LogCleaningAbortedException();
+            }
+            return null;
+        });
+    }
+
+    /**
+     * Update checkpoint file, adding or removing partitions if necessary.
+     *
+     * @param dataDir                The File object to be updated
+     * @param partitionToUpdateOrAdd The [TopicPartition, Long] map data to be 
updated. pass "none" if doing remove, not add
+     * @param partitionToRemove      The TopicPartition to be removed
+     */
+    public void updateCheckpoints(
+            File dataDir,
+            Optional<Map.Entry<TopicPartition, Long>> partitionToUpdateOrAdd,
+            Optional<TopicPartition> partitionToRemove
+    ) {
+        inLock(lock, () -> {
+            OffsetCheckpointFile checkpoint = checkpoints.get(dataDir);
+            if (checkpoint != null) {
+                try {
+                    Map<TopicPartition, Long> currentCheckpoint = 
checkpoint.read().entrySet().stream()
+                            .filter(entry -> logs.containsKey(entry.getKey()))
+                            .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                    // remove the partition offset if any
+                    Map<TopicPartition, Long> updatedCheckpoint = 
partitionToRemove.map(topicPartition -> {
+                        Map<TopicPartition, Long> newCheckpoint = new 
HashMap<>(currentCheckpoint);
+                        newCheckpoint.remove(topicPartition);
+                        return newCheckpoint;
+                    }).orElse(currentCheckpoint);
+
+                    // update or add the partition offset if any
+                    Map<TopicPartition, Long> tempUpdatedCheckpoint = 
updatedCheckpoint;
+                    updatedCheckpoint = partitionToUpdateOrAdd.map(entry -> {
+                        Map<TopicPartition, Long> newCheckpoint = new 
HashMap<>(tempUpdatedCheckpoint);
+                        newCheckpoint.put(entry.getKey(), entry.getValue());
+                        return newCheckpoint;
+                    }).orElse(updatedCheckpoint);
+
+                    checkpoint.write(updatedCheckpoint);

Review Comment:
   Is the following easier to understand?
   
   ```
   Map<TopicPartition, Long> updatedCheckpoint = new 
HashMap<>(currentCheckpoint);
   
   // Remove the partition offset if present
   partitionToRemove.ifPresent(updatedCheckpoint::remove);
   
   // Update or add the partition offset if present
   partitionToUpdateOrAdd.ifPresent(entry -> 
updatedCheckpoint.put(entry.getKey(), entry.getValue()));
   
   // Write back the updated checkpoint
   checkpoint.write(updatedCheckpoint);
   ```



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java:
##########
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class manages the state of each partition being cleaned.
+ * LogCleaningState defines the cleaning states that a TopicPartition can be 
in.
+ * 1. None                    : No cleaning state in a TopicPartition. In this 
state, it can become LogCleaningInProgress
+ *                              or LogCleaningPaused(1). Valid previous state 
are LogCleaningInProgress and LogCleaningPaused(1)
+ * 2. LogCleaningInProgress   : The cleaning is currently in progress. In this 
state, it can become None when log cleaning is finished
+ *                              or become LogCleaningAborted. Valid previous 
state is None.
+ * 3. LogCleaningAborted      : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+ *                              Valid previous state is LogCleaningInProgress.
+ * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log cleaning 
can be done in this state.
+ *                              In this state, it can become None or 
LogCleaningPaused(2).
+ *                              Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+ * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 2. No 
log cleaning can be done in this state.
+ *                              In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ *                              Valid previous state is LogCleaningPaused(i-1) 
or LogCleaningPaused(i+1).
+ */
+public class LogCleanerManager {
+    public static final String OFFSET_CHECKPOINT_FILE = 
"cleaner-offset-checkpoint";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger("kafka.log.LogCleaner");
+
+    private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = 
"uncleanable-partitions-count";
+    private static final String UNCLEANABLE_BYTES_METRIC_NAME = 
"uncleanable-bytes";
+    private static final String MAX_DIRTY_PERCENT_METRIC_NAME = 
"max-dirty-percent";
+    private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = 
"time-since-last-run-ms";
+
+    // Visible for testing
+    public static final Set<String> GAUGE_METRIC_NAME_NO_TAG = 
Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
+
+    // For compatibility, metrics are defined to be under 
`kafka.log.LogCleanerManager` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleanerManager");
+
+    /* the set of logs currently being cleaned */
+    private final Map<TopicPartition, LogCleaningState> inProgress = new 
HashMap<>();
+
+    /* the set of uncleanable partitions (partitions that have raised an 
unexpected error during cleaning)
+     *   for each log directory */
+    private final Map<String, Set<TopicPartition>> uncleanablePartitions = new 
HashMap<>();
+
+    /* a global lock used to control all access to the in-progress set and the 
offset checkpoints */
+    private final Lock lock = new ReentrantLock();
+
+    /* for coordinating the pausing and the cleaning of a partition */
+    private final Condition pausedCleaningCond = lock.newCondition();
+
+    // Visible for testing
+    private final Map<String, List<Map<String, String>>> 
gaugeMetricNameWithTag = new HashMap<>();
+
+    private final List<File> logDirs;
+    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
+
+    /* the offset checkpoints holding the last cleaned point for each log */
+    private volatile Map<File, OffsetCheckpointFile> checkpoints;
+
+    private volatile double dirtiestLogCleanableRatio;
+    private volatile long timeOfLastRun;
+
+    @SuppressWarnings({"this-escape"})
+    public LogCleanerManager(
+            List<File> logDirs,
+            ConcurrentMap<TopicPartition, UnifiedLog> logs,
+            LogDirFailureChannel logDirFailureChannel
+    ) {
+        this.logDirs = logDirs;
+        this.logs = logs;
+        this.checkpoints = logDirs.stream()
+                .collect(Collectors.toMap(
+                        dir -> dir,
+                        dir -> {
+                            try {
+                                return new OffsetCheckpointFile(new File(dir, 
OFFSET_CHECKPOINT_FILE), logDirFailureChannel);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                ));
+
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        /* gauges for tracking the number of partitions marked as uncleanable 
for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME,
+                    () -> inLock(lock, () -> 
uncleanablePartitions.getOrDefault(dir.getAbsolutePath(), Set.of()).size()),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, 
k -> new ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* gauges for tracking the number of uncleanable bytes from 
uncleanable partitions for each log directory */
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_BYTES_METRIC_NAME,
+                    () -> inLock(lock, () -> {
+                        Set<TopicPartition> partitions = 
uncleanablePartitions.get(dir.getAbsolutePath());
+
+                        if (partitions == null) {
+                            return 0;
+                        } else {
+                            Map<TopicPartition, Long> lastClean = 
allCleanerCheckpoints();
+                            long now = Time.SYSTEM.milliseconds();
+                            return partitions.stream()
+                                    .mapToLong(tp -> {
+                                        UnifiedLog log = logs.get(tp);
+                                        if (log != null) {
+                                            Optional<Long> lastCleanOffset = 
Optional.of(lastClean.get(tp));
+                                            OffsetsToClean offsetsToClean = 
cleanableOffsets(log, lastCleanOffset, now);
+                                            return 
calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset(),
+                                                    
offsetsToClean.firstUncleanableDirtyOffset()).getValue();
+                                        } else {
+                                            return 0L;
+                                        }
+                                    }).sum();
+                        }
+                    }),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_BYTES_METRIC_NAME, k -> new 
ArrayList<>())
+                    .add(metricTag);
+        }
+
+        /* a gauge for tracking the cleanable ratio of the dirtiest log */
+        dirtiestLogCleanableRatio = 0.0;
+        metricsGroup.newGauge(MAX_DIRTY_PERCENT_METRIC_NAME, () -> (int) (100 
* dirtiestLogCleanableRatio));
+
+        /* a gauge for tracking the time since the last log cleaner run, in 
milliseconds */
+        timeOfLastRun = Time.SYSTEM.milliseconds();
+        metricsGroup.newGauge(TIME_SINCE_LAST_RUN_MS_METRIC_NAME, () -> 
Time.SYSTEM.milliseconds() - timeOfLastRun);
+    }
+
+    public Map<String, List<Map<String, String>>> gaugeMetricNameWithTag() {
+        return gaugeMetricNameWithTag;
+    }
+
+    /**
+     * @return the position processed for all logs.
+     */
+    public Map<TopicPartition, Long> allCleanerCheckpoints() {
+        return inLock(lock, () -> checkpoints.values().stream()
+                .flatMap(checkpoint -> {
+                    try {
+                        return checkpoint.read().entrySet().stream()
+                                .map(entry -> Map.entry(entry.getKey(), 
entry.getValue()));
+                    } catch (KafkaStorageException e) {
+                        LOG.error("Failed to access checkpoint file {} in dir 
{}",
+                                checkpoint.file().getName(), 
checkpoint.file().getParentFile().getAbsolutePath(), e);
+                        return Stream.empty();
+                    }
+                })
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    }
+
+    /**
+     * Package private for unit test. Get the cleaning state of the partition.
+     */
+    public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
+        return inLock(lock, () -> Optional.ofNullable(inProgress.get(tp)));
+    }
+
+    /**
+     * Package private for unit test. Set the cleaning state of the partition.
+     */
+    public void setCleaningState(TopicPartition tp, LogCleaningState state) {
+        inLock(lock, () -> inProgress.put(tp, state));
+    }
+
+    /**
+     * Choose the log to clean next and add it to the in-progress set. We 
recompute this
+     * each time from the full set of logs to allow logs to be dynamically 
added to the pool of logs
+     * the log manager maintains.
+     */
+    public Optional<LogToClean> grabFilthiestCompactedLog(Time time, 
PreCleanStats preCleanStats) {
+        return inLock(lock, () -> {
+            long now = time.milliseconds();
+            this.timeOfLastRun = now;
+            Map<TopicPartition, Long> lastClean = allCleanerCheckpoints();
+
+            List<LogToClean> dirtyLogs = logs.entrySet().stream()
+                    .filter(entry -> entry.getValue().config().compact)
+                    .filter(entry -> !(inProgress.containsKey(entry.getKey()) 
|| isUncleanablePartition(entry.getValue(), entry.getKey())))
+                    .map(entry -> {
+                                // create a LogToClean instance for each
+                                TopicPartition topicPartition = entry.getKey();
+                                UnifiedLog log = entry.getValue();
+                                try {
+                                    Long lastCleanOffset = 
lastClean.get(topicPartition);
+                                    OffsetsToClean offsetsToClean = 
cleanableOffsets(log, Optional.ofNullable(lastCleanOffset), now);
+                                    // update checkpoint for logs with invalid 
checkpointed offsets
+                                    if (offsetsToClean.forceUpdateCheckpoint) {
+                                        updateCheckpoints(log.parentDirFile(), 
Optional.of(Map.entry(topicPartition, offsetsToClean.firstDirtyOffset)), 
Optional.empty());
+                                    }
+                                    long compactionDelayMs = 
maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now);
+                                    
preCleanStats.updateMaxCompactionDelay(compactionDelayMs);
+
+                                    return new LogToClean(topicPartition, log, 
offsetsToClean.firstDirtyOffset,
+                                            
offsetsToClean.firstUncleanableDirtyOffset, compactionDelayMs > 0);
+                                } catch (Throwable e) {
+                                    throw new LogCleaningException(log, 
"Failed to calculate log cleaning stats for partition " + topicPartition, e);
+                                }
+                            }
+                    ).filter(ltc -> ltc.totalBytes() > 0) // skip any empty 
logs
+                    .toList();
+
+            this.dirtiestLogCleanableRatio = dirtyLogs.isEmpty()
+                    ? 0
+                    : dirtyLogs.stream()
+                    
.max(Comparator.comparingDouble(LogToClean::cleanableRatio))
+                    .map(LogToClean::cleanableRatio)
+                    .orElse(0.0);
+            // and must meet the minimum threshold for dirty byte ratio or 
have some bytes required to be compacted
+            List<LogToClean> cleanableLogs = dirtyLogs.stream()
+                    .filter(ltc -> (ltc.needCompactionNow() && 
ltc.cleanableBytes() > 0) || ltc.cleanableRatio() > 
ltc.log().config().minCleanableRatio)
+                    .toList();
+
+            if (cleanableLogs.isEmpty()) {
+                return Optional.empty();
+            } else {
+                preCleanStats.recordCleanablePartitions(cleanableLogs.size());
+                LogToClean filthiest = cleanableLogs.stream()
+                        
.max(Comparator.comparingDouble(LogToClean::cleanableRatio))
+                        .orElseThrow(() -> new IllegalStateException("No 
filthiest log found"));
+
+                inProgress.put(filthiest.topicPartition(), 
LogCleaningState.LogCleaningInProgress.getInstance());
+                return Optional.of(filthiest);
+            }
+        });
+    }
+
+    /**
+     * Pause logs cleaning for logs that do not have compaction enabled
+     * and do not have other deletion or compaction in progress.
+     * This is to handle potential race between retention and cleaner threads 
when users
+     * switch topic configuration between compacted and non-compacted topic.
+     *
+     * @return retention logs that have log cleaning successfully paused
+     */
+    public List<Map.Entry<TopicPartition, UnifiedLog>> 
pauseCleaningForNonCompactedPartitions() {
+        return inLock(lock, () -> {
+            List<Map.Entry<TopicPartition, UnifiedLog>> deletableLogs = 
logs.entrySet().stream()
+                    .filter(entry -> !entry.getValue().config().compact) // 
pick non-compacted logs
+                    .filter(entry -> !inProgress.containsKey(entry.getKey())) 
// skip any logs already in-progress
+                    .collect(Collectors.toList());
+
+            deletableLogs.forEach(entry -> inProgress.put(entry.getKey(), new 
LogCleaningState.LogCleaningPaused(1)));
+
+            return deletableLogs;
+        });
+    }
+
+    /**
+     * Find any logs that have compaction enabled. Mark them as being cleaned
+     * Include logs without delete enabled, as they may have segments
+     * that precede the start offset.
+     */
+    public List<Map.Entry<TopicPartition, UnifiedLog>> deletableLogs() {
+        return inLock(lock, () -> {
+            List<Map.Entry<TopicPartition, UnifiedLog>> toClean = 
logs.entrySet().stream()
+                    .filter(entry -> {
+                        TopicPartition topicPartition = entry.getKey();
+                        UnifiedLog log = entry.getValue();
+                        return !inProgress.containsKey(topicPartition) && 
log.config().compact &&
+                                !isUncleanablePartition(log, topicPartition);
+                    })
+                    .collect(Collectors.toList());
+            toClean.forEach(entry -> inProgress.put(entry.getKey(), 
LogCleaningState.LogCleaningInProgress.getInstance()));
+            return toClean;
+        });
+    }
+
+    /**
+     * Abort the cleaning of a particular partition, if it's in progress. This 
call blocks until the cleaning of
+     * the partition is aborted.
+     * This is implemented by first abortAndPausing and then resuming the 
cleaning of the partition.
+     */
+    public void abortCleaning(TopicPartition topicPartition) {
+        inLock(lock, () -> {
+            abortAndPauseCleaning(topicPartition);
+            resumeCleaning(List.of(topicPartition));
+            return null;
+        });
+    }
+
+    /**
+     * Abort the cleaning of a particular partition if it's in progress, and 
pause any future cleaning of this partition.
+     * This call blocks until the cleaning of the partition is aborted and 
paused.
+     * 1. If the partition is not in progress, mark it as paused.
+     * 2. Otherwise, first mark the state of the partition as aborted.
+     * 3. The cleaner thread checks the state periodically and if it sees the 
state of the partition is aborted, it
+     *    throws a LogCleaningAbortedException to stop the cleaning task.
+     * 4. When the cleaning task is stopped, doneCleaning() is called, which 
sets the state of the partition as paused.
+     * 5. abortAndPauseCleaning() waits until the state of the partition is 
changed to paused.
+     * 6. If the partition is already paused, a new call to this function
+     *    will increase the paused count by one.
+     */
+    public void abortAndPauseCleaning(TopicPartition topicPartition) {
+        inLock(lock, () -> {
+            LogCleaningState state = inProgress.get(topicPartition);
+
+            if (state == null) {
+                inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(1));
+            } else if (state instanceof 
LogCleaningState.LogCleaningInProgress) {
+                inProgress.put(topicPartition, 
LogCleaningState.LogCleaningAborted.getInstance());
+            } else if (state instanceof LogCleaningState.LogCleaningPaused 
logCleaningPaused) {
+                inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(logCleaningPaused.getPausedCount() + 1));
+            } else {
+                throw new IllegalStateException("Compaction for partition " + 
topicPartition +
+                        " cannot be aborted and paused since it is in " + 
state + " state.");
+            }
+
+            while (!isCleaningInStatePaused(topicPartition)) {
+                try {
+                    pausedCleaningCond.await(100, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            return null;
+        });
+    }
+
+    /**
+     * Resume the cleaning of paused partitions.
+     * Each call of this function will undo one pause.
+     */
+    public void resumeCleaning(List<TopicPartition> topicPartitions) {
+        inLock(lock, () -> {
+            topicPartitions.forEach(topicPartition -> {
+                LogCleaningState state = inProgress.get(topicPartition);
+
+                if (state == null) {
+                    throw new IllegalStateException("Compaction for partition 
" + topicPartition + " cannot be resumed since it is not paused.");
+                }
+
+                if (state instanceof LogCleaningState.LogCleaningPaused 
logCleaningPaused) {
+                    if (logCleaningPaused.getPausedCount() == 1) {
+                        inProgress.remove(topicPartition);
+                    } else if (logCleaningPaused.getPausedCount() > 1) {
+                        inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(logCleaningPaused.getPausedCount() - 1));
+                    }
+                } else {
+                    throw new IllegalStateException("Compaction for partition 
" + topicPartition +
+                            " cannot be resumed since it is in " + state + " 
state.");
+                }
+            });
+
+            return null;
+        });
+    }
+
+    /**
+     * Check if the cleaning for a partition is in a particular state. The 
caller is expected to hold lock while making the call.
+     */
+    private boolean isCleaningInState(TopicPartition topicPartition, 
LogCleaningState expectedState) {
+        LogCleaningState state = inProgress.get(topicPartition);
+
+        if (state == null) {
+            return false;
+        } else {
+            return state == expectedState;
+        }
+    }
+
+    /**
+     * Check if the cleaning for a partition is paused. The caller is expected 
to hold lock while making the call.
+     */
+    private boolean isCleaningInStatePaused(TopicPartition topicPartition) {
+        LogCleaningState state = inProgress.get(topicPartition);
+
+        if (state == null) {
+            return false;
+        } else {
+            return state instanceof LogCleaningState.LogCleaningPaused;
+        }
+    }
+
+    /**
+     * Check if the cleaning for a partition is aborted. If so, throw an 
exception.
+     */
+    public void checkCleaningAborted(TopicPartition topicPartition) {
+        inLock(lock, () -> {
+            if (isCleaningInState(topicPartition, 
LogCleaningState.LogCleaningAborted.getInstance())) {
+                throw new LogCleaningAbortedException();
+            }
+            return null;
+        });
+    }
+
+    /**
+     * Update checkpoint file, adding or removing partitions if necessary.
+     *
+     * @param dataDir                The File object to be updated
+     * @param partitionToUpdateOrAdd The [TopicPartition, Long] map data to be 
updated. pass "none" if doing remove, not add
+     * @param partitionToRemove      The TopicPartition to be removed
+     */
+    public void updateCheckpoints(
+            File dataDir,
+            Optional<Map.Entry<TopicPartition, Long>> partitionToUpdateOrAdd,
+            Optional<TopicPartition> partitionToRemove
+    ) {
+        inLock(lock, () -> {
+            OffsetCheckpointFile checkpoint = checkpoints.get(dataDir);
+            if (checkpoint != null) {
+                try {
+                    Map<TopicPartition, Long> currentCheckpoint = 
checkpoint.read().entrySet().stream()
+                            .filter(entry -> logs.containsKey(entry.getKey()))
+                            .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                    // remove the partition offset if any
+                    Map<TopicPartition, Long> updatedCheckpoint = 
partitionToRemove.map(topicPartition -> {
+                        Map<TopicPartition, Long> newCheckpoint = new 
HashMap<>(currentCheckpoint);
+                        newCheckpoint.remove(topicPartition);
+                        return newCheckpoint;
+                    }).orElse(currentCheckpoint);
+
+                    // update or add the partition offset if any
+                    Map<TopicPartition, Long> tempUpdatedCheckpoint = 
updatedCheckpoint;
+                    updatedCheckpoint = partitionToUpdateOrAdd.map(entry -> {
+                        Map<TopicPartition, Long> newCheckpoint = new 
HashMap<>(tempUpdatedCheckpoint);
+                        newCheckpoint.put(entry.getKey(), entry.getValue());
+                        return newCheckpoint;
+                    }).orElse(updatedCheckpoint);
+
+                    checkpoint.write(updatedCheckpoint);
+                } catch (KafkaStorageException e) {
+                    LOG.error("Failed to access checkpoint file {} in dir {}",
+                            checkpoint.file().getName(), 
checkpoint.file().getParentFile().getAbsolutePath(), e);
+                }
+            }
+
+            return null;
+        });
+    }
+
+    /**
+     * alter the checkpoint directory for the topicPartition, to remove the 
data in sourceLogDir, and add the data in destLogDir
+     */
+    public void alterCheckpointDir(TopicPartition topicPartition, File 
sourceLogDir, File destLogDir) {
+        inLock(lock, () -> {
+            try {
+                Optional<Long> offsetOpt = 
Optional.ofNullable(checkpoints.get(sourceLogDir))
+                        .flatMap(checkpoint -> 
Optional.ofNullable(checkpoint.read().get(topicPartition)));
+
+                offsetOpt.ifPresent(offset -> {
+                    LOG.debug("Removing the partition offset data in 
checkpoint file for '{}' from {} directory.",
+                            topicPartition, sourceLogDir.getAbsoluteFile());
+                    updateCheckpoints(sourceLogDir, Optional.empty(), 
Optional.of(topicPartition));
+
+                    LOG.debug("Adding the partition offset data in checkpoint 
file for '{}' to {} directory.",
+                            topicPartition, destLogDir.getAbsoluteFile());
+                    updateCheckpoints(destLogDir, 
Optional.of(Map.entry(topicPartition, offset)), Optional.empty());
+                });
+            } catch (KafkaStorageException e) {
+                LOG.error("Failed to access checkpoint file in dir {}", 
sourceLogDir.getAbsolutePath(), e);
+            }
+
+            Set<TopicPartition> logUncleanablePartitions = 
uncleanablePartitions.getOrDefault(sourceLogDir.toString(), 
Collections.emptySet());
+            if (logUncleanablePartitions.contains(topicPartition)) {
+                logUncleanablePartitions.remove(topicPartition);
+                markPartitionUncleanable(destLogDir.toString(), 
topicPartition);
+            }
+
+            return null;
+        });
+    }
+
+    /**
+     * Stop cleaning logs in the provided directory
+     *
+     * @param dir the absolute path of the log dir
+     */
+    public void handleLogDirFailure(String dir) {
+        LOG.warn("Stopping cleaning logs in dir {}", dir);
+        inLock(lock, () -> {
+            checkpoints = checkpoints.entrySet().stream()
+                    .filter(entry -> 
!entry.getKey().getAbsolutePath().equals(dir))
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+            return null;
+        });
+    }
+
+    /**
+     * Truncate the checkpointed offset for the given partition if its 
checkpointed offset is larger than the given offset
+     */
+    public void maybeTruncateCheckpoint(File dataDir, TopicPartition 
topicPartition, long offset) {
+        inLock(lock, () -> {
+            if (logs.get(topicPartition).config().compact) {
+                OffsetCheckpointFile checkpoint = checkpoints.get(dataDir);
+                if (checkpoint != null) {
+                    Map<TopicPartition, Long> existing = checkpoint.read();
+                    if (existing.getOrDefault(topicPartition, 0L) > offset) {
+                        existing.put(topicPartition, offset);
+                        checkpoint.write(existing);
+                    }
+                }
+            }
+
+            return null;
+        });
+    }
+
+    /**
+     * Save out the endOffset and remove the given log from the in-progress 
set, if not aborted.
+     */
+    public void doneCleaning(TopicPartition topicPartition, File dataDir, long 
endOffset) {
+        inLock(lock, () -> {
+            LogCleaningState state = inProgress.get(topicPartition);
+
+            if (state == null) {
+                throw new IllegalStateException("State for partition " + 
topicPartition + " should exist.");
+            } else if (state instanceof 
LogCleaningState.LogCleaningInProgress) {
+                updateCheckpoints(dataDir, 
Optional.of(Map.entry(topicPartition, endOffset)), Optional.empty());
+                inProgress.remove(topicPartition);
+            } else if (state instanceof LogCleaningState.LogCleaningAborted) {
+                inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(1));
+                pausedCleaningCond.signalAll();
+            } else {
+                throw new IllegalStateException("In-progress partition " + 
topicPartition + " cannot be in " + state + " state.");
+            }
+
+            return null;
+        });
+    }
+
+    public void doneDeleting(List<TopicPartition> topicPartitions) {
+        inLock(lock, () -> {
+            topicPartitions.forEach(topicPartition -> {
+                LogCleaningState logCleaningState = 
inProgress.get(topicPartition);
+
+                if (logCleaningState == null) {
+                    throw new IllegalStateException("State for partition " + 
topicPartition + " should exist.");
+                } else if (logCleaningState == 
LogCleaningState.LogCleaningInProgress.getInstance()) {
+                    inProgress.remove(topicPartition);
+                } else if (logCleaningState == 
LogCleaningState.LogCleaningAborted.getInstance()) {
+                    inProgress.put(topicPartition, new 
LogCleaningState.LogCleaningPaused(1));
+                    pausedCleaningCond.signalAll();
+                } else {
+                    throw new IllegalStateException("In-progress partition " + 
topicPartition + " cannot be in " + logCleaningState + " state.");
+                }
+            });
+
+            return null;
+        });
+    }
+
+    /**
+     * Returns an immutable set of the uncleanable partitions for a given log 
directory
+     * Only used for testing
+     */
+    public Set<TopicPartition> uncleanablePartitions(String logDir) {
+        return inLock(lock, () -> uncleanablePartitions.getOrDefault(logDir, 
Set.of()));
+    }
+
+    public void markPartitionUncleanable(String logDir, TopicPartition 
partition) {
+        inLock(lock, () -> {
+            Set<TopicPartition> partitions = uncleanablePartitions.get(logDir);
+
+            if (partitions == null) {
+                Set<TopicPartition> newPartitions = new HashSet<>();
+                newPartitions.add(partition);
+                uncleanablePartitions.put(logDir, newPartitions);
+            } else {
+                partitions.add(partition);
+            }
+
+            return null;
+        });
+    }
+
+    private boolean isUncleanablePartition(UnifiedLog log, TopicPartition 
topicPartition) {
+        return inLock(lock, () -> 
Optional.ofNullable(uncleanablePartitions.get(log.parentDir()))
+                .map(partitions -> partitions.contains(topicPartition))
+                .orElse(false)
+        );
+    }
+
+    public void maintainUncleanablePartitions() {
+        // Remove deleted partitions from uncleanablePartitions
+        inLock(lock, () -> {
+            // Remove deleted partitions
+            uncleanablePartitions.values().forEach(partitions ->
+                    partitions.removeIf(partition -> 
!logs.containsKey(partition)));
+
+            // Remove entries with empty partition set.
+            uncleanablePartitions.entrySet().removeIf(entry -> 
entry.getValue().isEmpty());
+
+            return null;
+        });
+    }
+
+    public void removeMetrics() {
+        GAUGE_METRIC_NAME_NO_TAG.forEach(metricsGroup::removeMetric);
+        gaugeMetricNameWithTag.forEach((metricName, tags) ->
+                tags.forEach(tag -> metricsGroup.removeMetric(metricName, 
tag)));
+        gaugeMetricNameWithTag.clear();
+    }
+
+    private <T> T inLock(Lock lock, Supplier<T> supplier) {

Review Comment:
   Could we add this to server-common so that it could be reused?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to