junrao commented on code in PR #19216:
URL: https://github.com/apache/kafka/pull/19216#discussion_r2007932566


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogToClean.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+
+/**
+ * Helper class for a log, its topic/partition, the first cleanable position, 
the first uncleanable dirty position,
+ * and whether it needs compaction immediately.
+ */
+public final class LogToClean implements Comparable<LogToClean> {
+    private final TopicPartition topicPartition;
+    private final UnifiedLog log;
+    private final long firstDirtyOffset;
+    private final long uncleanableOffset;

Review Comment:
   This is not needed at instance level.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java:
##########
@@ -0,0 +1,799 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.server.util.LockUtils.inLock;
+
+/**
+ * This class manages the state of each partition being cleaned.
+ * LogCleaningState defines the cleaning states that a TopicPartition can be 
in.
+ * 1. None                    : No cleaning state in a TopicPartition. In this 
state, it can become LogCleaningInProgress
+ *                              or LogCleaningPaused(1). Valid previous state 
are LogCleaningInProgress and LogCleaningPaused(1)
+ * 2. LogCleaningInProgress   : The cleaning is currently in progress. In this 
state, it can become None when log cleaning is finished
+ *                              or become LogCleaningAborted. Valid previous 
state is None.
+ * 3. LogCleaningAborted      : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+ *                              Valid previous state is LogCleaningInProgress.
+ * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log cleaning 
can be done in this state.
+ *                              In this state, it can become None or 
LogCleaningPaused(2).
+ *                              Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+ * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 2. No 
log cleaning can be done in this state.
+ *                              In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ *                              Valid previous state is LogCleaningPaused(i-1) 
or LogCleaningPaused(i+1).
+ */
+public class LogCleanerManager {
+    public static final String OFFSET_CHECKPOINT_FILE = 
"cleaner-offset-checkpoint";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger("kafka.log.LogCleaner");
+
+    private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = 
"uncleanable-partitions-count";
+    private static final String UNCLEANABLE_BYTES_METRIC_NAME = 
"uncleanable-bytes";
+    private static final String MAX_DIRTY_PERCENT_METRIC_NAME = 
"max-dirty-percent";
+    private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = 
"time-since-last-run-ms";
+
+    // Visible for testing
+    public static final Set<String> GAUGE_METRIC_NAME_NO_TAG = 
Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
+
+    // For compatibility, metrics are defined to be under 
`kafka.log.LogCleanerManager` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleanerManager");
+
+    /**
+     * The set of logs currently being cleaned.
+     */
+    private final Map<TopicPartition, LogCleaningState> inProgress = new 
HashMap<>();
+
+    /**
+     * The set of uncleanable partitions (partitions that have raised an 
unexpected error during cleaning)
+     * for each log directory.
+     */
+    private final Map<String, Set<TopicPartition>> uncleanablePartitions = new 
HashMap<>();
+
+    /**
+     * A global lock used to control all access to the in-progress set and the 
offset checkpoints.
+     */
+    private final Lock lock = new ReentrantLock();
+
+    /**
+     * For coordinating the pausing and the cleaning of a partition.
+     */
+    private final Condition pausedCleaningCond = lock.newCondition();
+
+    private final Map<String, List<Map<String, String>>> 
gaugeMetricNameWithTag = new HashMap<>();
+
+    private final List<File> logDirs;
+    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
+
+    /**
+     * The offset checkpoints holding the last cleaned point for each log.
+     */
+    private volatile Map<File, OffsetCheckpointFile> checkpoints;
+
+    private volatile double dirtiestLogCleanableRatio;
+    private volatile long timeOfLastRun;
+
+    @SuppressWarnings({"this-escape"})
+    public LogCleanerManager(
+            List<File> logDirs,
+            ConcurrentMap<TopicPartition, UnifiedLog> logs,
+            LogDirFailureChannel logDirFailureChannel
+    ) {
+        this.logDirs = logDirs;
+        this.logs = logs;
+        checkpoints = logDirs.stream()
+                .collect(Collectors.toMap(
+                        dir -> dir,
+                        dir -> {
+                            try {
+                                return new OffsetCheckpointFile(new File(dir, 
OFFSET_CHECKPOINT_FILE), logDirFailureChannel);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                ));
+
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        // gauges for tracking the number of partitions marked as uncleanable 
for each log directory
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME,
+                    () -> inLock(lock, () -> 
uncleanablePartitions.getOrDefault(dir.getAbsolutePath(), Set.of()).size()),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME, 
k -> new ArrayList<>())
+                    .add(metricTag);
+        }
+
+        // gauges for tracking the number of uncleanable bytes from 
uncleanable partitions for each log directory
+        for (File dir : logDirs) {
+            Map<String, String> metricTag = Map.of("logDirectory", 
dir.getAbsolutePath());
+            metricsGroup.newGauge(
+                    UNCLEANABLE_BYTES_METRIC_NAME,
+                    () -> inLock(lock, () -> {
+                        Set<TopicPartition> partitions = 
uncleanablePartitions.get(dir.getAbsolutePath());
+
+                        if (partitions == null) {
+                            return 0;
+                        } else {
+                            Map<TopicPartition, Long> lastClean = 
allCleanerCheckpoints();
+                            long now = Time.SYSTEM.milliseconds();
+                            return partitions.stream()
+                                    .mapToLong(tp -> {
+                                        UnifiedLog log = logs.get(tp);
+                                        if (log != null) {
+                                            Optional<Long> lastCleanOffset = 
Optional.of(lastClean.get(tp));
+                                            OffsetsToClean offsetsToClean = 
cleanableOffsets(log, lastCleanOffset, now);
+                                            return 
calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset(),
+                                                    
offsetsToClean.firstUncleanableDirtyOffset()).getValue();
+                                        } else {
+                                            return 0L;
+                                        }
+                                    }).sum();
+                        }
+                    }),
+                    metricTag
+            );
+
+            gaugeMetricNameWithTag
+                    .computeIfAbsent(UNCLEANABLE_BYTES_METRIC_NAME, k -> new 
ArrayList<>())
+                    .add(metricTag);
+        }
+
+        // a gauge for tracking the cleanable ratio of the dirtiest log
+        dirtiestLogCleanableRatio = 0.0;
+        metricsGroup.newGauge(MAX_DIRTY_PERCENT_METRIC_NAME, () -> (int) (100 
* dirtiestLogCleanableRatio));
+
+        // a gauge for tracking the time since the last log cleaner run, in 
milliseconds
+        timeOfLastRun = Time.SYSTEM.milliseconds();
+        metricsGroup.newGauge(TIME_SINCE_LAST_RUN_MS_METRIC_NAME, () -> 
Time.SYSTEM.milliseconds() - timeOfLastRun);
+    }
+
+    public Map<String, List<Map<String, String>>> gaugeMetricNameWithTag() {
+        return gaugeMetricNameWithTag;
+    }
+
+    /**
+     * @return the position processed for all logs.
+     */
+    public Map<TopicPartition, Long> allCleanerCheckpoints() {
+        return inLock(lock, () -> checkpoints.values().stream()
+                .flatMap(checkpoint -> {
+                    try {
+                        return checkpoint.read().entrySet().stream();
+                    } catch (KafkaStorageException e) {
+                        LOG.error("Failed to access checkpoint file {} in dir 
{}",
+                                checkpoint.file().getName(), 
checkpoint.file().getParentFile().getAbsolutePath(), e);
+                        return Stream.empty();
+                    }
+                })
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    }
+
+    /**
+     * Public for unit test. Get the cleaning state of the partition.
+     */
+    public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
+        return inLock(lock, () -> Optional.ofNullable(inProgress.get(tp)));
+    }
+
+    /**
+     * Public for unit test. Set the cleaning state of the partition.
+     */
+    public void setCleaningState(TopicPartition tp, LogCleaningState state) {
+        inLock(lock, () -> inProgress.put(tp, state));
+    }
+
+    /**
+     * Choose the log to clean next and add it to the in-progress set. We 
recompute this
+     * each time from the full set of logs to allow logs to be dynamically 
added to the pool of logs
+     * the log manager maintains.
+     */
+    public Optional<LogToClean> grabFilthiestCompactedLog(Time time, 
PreCleanStats preCleanStats) {
+        return inLock(lock, () -> {
+            long now = time.milliseconds();
+            timeOfLastRun = now;
+            Map<TopicPartition, Long> lastClean = allCleanerCheckpoints();
+
+            List<LogToClean> dirtyLogs = logs.entrySet().stream()
+                    .filter(entry -> entry.getValue().config().compact &&
+                            !(inProgress.containsKey(entry.getKey()) || 
isUncleanablePartition(entry.getValue(), entry.getKey()))

Review Comment:
   This is equivalent to the following, right? Avoiding an extra level of 
nesting probably makes the code a bit easier to understand.
   
   ```
                        !inProgress.containsKey(entry.getKey()) &&
                        !isUncleanablePartition(entry.getValue(), entry.getKey()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to