chia7712 commented on code in PR #19387:
URL: https://github.com/apache/kafka/pull/19387#discussion_r2052025917


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java:
##########
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.config.BrokerReconfigurable;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.storage.internals.utils.Throttler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.DigestException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * The cleaner is responsible for removing obsolete records from logs which 
have the "compact" retention strategy.
+ * A message with key K and offset O is obsolete if there exists a message 
with key K and offset O' such that O < O'.
+ * <p>
+ * Each log can be thought of being split into two sections of segments: a 
"clean" section which has previously been cleaned followed by a
+ * "dirty" section that has not yet been cleaned. The dirty section is further 
divided into the "cleanable" section followed by an "uncleanable" section.
+ * The uncleanable section is excluded from cleaning. The active log segment 
is always uncleanable. If there is a
+ * compaction lag time set, segments whose largest message timestamp is within 
the compaction lag time of the cleaning operation are also uncleanable.
+ * <p>
+ * The cleaning is carried out by a pool of background threads. Each thread 
chooses the dirtiest log that has the "compact" retention policy
+ * and cleans that. The dirtiness of the log is guessed by taking the ratio of 
bytes in the dirty section of the log to the total bytes in the log.
+ * <p>
+ * To clean a log the cleaner first builds a mapping of key=>last_offset for 
the dirty section of the log. See {@link OffsetMap} for details of
+ * the implementation of the mapping.
+ * <p>
+ * Once the key=>last_offset map is built, the log is cleaned by recopying 
each log segment but omitting any key that appears in the offset map with a
+ * higher offset than what is found in the segment (i.e. messages with a key 
that appears in the dirty section of the log).
+ * <p>
+ * To avoid segments shrinking to very small sizes with repeated cleanings we 
implement a rule by which if we will merge successive segments when
+ * doing a cleaning if their log and index size are less than the maximum log 
and index size prior to the clean beginning.
+ * <p>
+ * Cleaned segments are swapped into the log as they become available.
+ * <p>
+ * One nuance that the cleaner must handle is log truncation. If a log is 
truncated while it is being cleaned the cleaning of that log is aborted.
+ * <p>
+ * Messages with null payload are treated as deletes for the purpose of log 
compaction. This means that they receive special treatment by the cleaner.
+ * The cleaner will only retain delete records for a period of time to avoid 
accumulating space indefinitely. This period of time is configurable on a 
per-topic
+ * basis and is measured from the time the segment enters the clean portion of 
the log (at which point any prior message with that key has been removed).
+ * Delete markers in the clean section of the log that are older than this 
time will not be retained when log segments are being recopied as part of 
cleaning.
+ * This time is tracked by setting the base timestamp of a record batch with 
delete markers when the batch is recopied in the first cleaning that encounters
+ * it. The relative timestamps of the records in the batch are also modified 
when recopied in this cleaning according to the new base timestamp of the batch.
+ * <p>
+ * Note that cleaning is more complicated with the idempotent/transactional 
producer capabilities. The following
+ * are the key points:
+ * <p>
+ * <ol>
+ * <li>In order to maintain sequence number continuity for active producers, 
we always retain the last batch
+ *    from each producerId, even if all the records from the batch have been 
removed. The batch will be removed
+ *    once the producer either writes a new batch or is expired due to 
inactivity.</li>
+ * <li>We do not clean beyond the last stable offset. This ensures that all 
records observed by the cleaner have
+ *    been decided (i.e. committed or aborted). In particular, this allows us 
to use the transaction index to
+ *    collect the aborted transactions ahead of time.</li>
+ * <li>Records from aborted transactions are removed by the cleaner 
immediately without regard to record keys.</li>
+ * <li>Transaction markers are retained until all record batches from the same 
transaction have been removed and
+ *    a sufficient amount of time has passed to reasonably ensure that an 
active consumer wouldn't consume any
+ *    data from the transaction prior to reaching the offset of the marker. 
This follows the same logic used for
+ *    tombstone deletion.</li>
+ * </ol>
+ */
+public class LogCleaner implements BrokerReconfigurable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LogCleaner.class);
+
+    public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+            CleanerConfig.LOG_CLEANER_THREADS_PROP,
+            CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
+            CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
+            CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
+            ServerConfigs.MESSAGE_MAX_BYTES_CONFIG,
+            CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
+            CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP
+    );
+
+    // Visible for test
+    public static final String MAX_BUFFER_UTILIZATION_PERCENT_METRIC_NAME = 
"max-buffer-utilization-percent";
+    public static final String MAX_CLEAN_TIME_METRIC_NAME = 
"max-clean-time-secs";
+    public static final String MAX_COMPACTION_DELAY_METRICS_NAME = 
"max-compaction-delay-secs";
+
+    private static final String CLEANER_RECOPY_PERCENT_METRIC_NAME = 
"cleaner-recopy-percent";
+    private static final String DEAD_THREAD_COUNT_METRIC_NAME = 
"DeadThreadCount";
+
+    // Visible for test
+    public static final Set<String> METRIC_NAMES = Set.of(
+            MAX_BUFFER_UTILIZATION_PERCENT_METRIC_NAME,
+            CLEANER_RECOPY_PERCENT_METRIC_NAME,
+            MAX_CLEAN_TIME_METRIC_NAME,
+            MAX_COMPACTION_DELAY_METRICS_NAME,
+            DEAD_THREAD_COUNT_METRIC_NAME
+    );
+
+    // For compatibility, metrics are defined to be under 
`kafka.log.LogCleaner` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleaner");
+
+    /**
+     * For managing the state of partitions being cleaned.
+     */
+    private final LogCleanerManager cleanerManager;
+
+    /**
+     * A throttle used to limit the I/O of all the cleaner threads to a 
user-specified maximum rate.
+     */
+    private final Throttler throttler;
+    private final ConcurrentMap<TopicPartition, UnifiedLog> logs;
+    private final LogDirFailureChannel logDirFailureChannel;
+    private final Time time;
+    private final List<CleanerThread> cleaners = new ArrayList<>();
+
+    /**
+     * Log cleaner configuration which may be dynamically updated.
+     */
+    private volatile CleanerConfig config;
+
+    /**
+     * @param initialConfig Initial configuration parameters for the cleaner. 
Actual config may be dynamically updated.
+     * @param logDirs The directories where offset checkpoints reside
+     * @param logs The map of logs
+     * @param logDirFailureChannel The channel used to add offline log dirs 
that may be encountered when cleaning the log
+     * @param time A way to control the passage of time
+     */
+    @SuppressWarnings("this-escape")
+    public LogCleaner(CleanerConfig initialConfig,
+                      List<File> logDirs,
+                      ConcurrentMap<TopicPartition, UnifiedLog> logs,
+                      LogDirFailureChannel logDirFailureChannel,
+                      Time time) {
+        config = initialConfig;
+        this.logs = logs;
+        this.logDirFailureChannel = logDirFailureChannel;
+        cleanerManager = new LogCleanerManager(logDirs, logs, 
this.logDirFailureChannel);
+        this.time = time;
+        throttler = new Throttler(config.maxIoBytesPerSecond, 300, 
"cleaner-io", "bytes", this.time);
+
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        /* a metric to track the maximum utilization of any thread's buffer in 
the last cleaning */
+        metricsGroup.newGauge(MAX_BUFFER_UTILIZATION_PERCENT_METRIC_NAME,
+                () -> (int) (maxOverCleanerThreads(t -> 
t.lastStats.bufferUtilization) * 100));
+
+        /* a metric to track the recopy rate of each thread's last cleaning */
+        metricsGroup.newGauge(CLEANER_RECOPY_PERCENT_METRIC_NAME, () -> {
+            List<CleanerStats> stats = cleaners.stream().map(t -> 
t.lastStats).toList();
+            double recopyRate = (double) stats.stream().mapToLong(stat -> 
stat.bytesWritten).sum() /
+                    Math.max(stats.stream().mapToLong(stat -> 
stat.bytesRead).sum(), 1);
+            return (int) (100 * recopyRate);
+        });
+
+        /* a metric to track the maximum cleaning time for the last cleaning 
from each thread */
+        metricsGroup.newGauge(MAX_CLEAN_TIME_METRIC_NAME, () -> (int) 
maxOverCleanerThreads(t -> t.lastStats.elapsedSecs()));
+
+        // a metric to track delay between the time when a log is required to 
be compacted
+        // as determined by max compaction lag and the time of last cleaner 
run.
+        metricsGroup.newGauge(MAX_COMPACTION_DELAY_METRICS_NAME,
+                () -> (int) (maxOverCleanerThreads(t -> (double) 
t.lastPreCleanStats.maxCompactionDelayMs()) / 1000));
+
+        metricsGroup.newGauge(DEAD_THREAD_COUNT_METRIC_NAME, 
this::deadThreadCount);
+    }
+
+    /**
+     * Start the background cleaner threads.
+     */
+    public void startup() {
+        LOG.info("Starting the log cleaner");
+        IntStream.range(0, config.numThreads).forEach(i -> {
+            try {
+                CleanerThread cleaner = new CleanerThread(i);
+                cleaners.add(cleaner);
+                cleaner.start();
+            } catch (NoSuchAlgorithmException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    /**
+     * Stop the background cleaner threads.
+     */
+    private void shutdownCleaners() {
+        LOG.info("Shutting down the log cleaner.");
+        cleaners.forEach(thread -> {
+            try {
+                thread.shutdown();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        cleaners.clear();
+    }
+
+    /**
+     * Stop the background cleaner threads.
+     */
+    public void shutdown() {
+        try {
+            shutdownCleaners();
+        } finally {
+            removeMetrics();
+        }
+    }
+
+    /**
+     * Remove metrics.
+     */
+    public void removeMetrics() {
+        METRIC_NAMES.forEach(metricsGroup::removeMetric);
+        cleanerManager.removeMetrics();
+    }
+
+    /**
+     * @return A set of configs that is reconfigurable in LogCleaner
+     */
+    @Override
+    public Set<String> reconfigurableConfigs() {
+        return RECONFIGURABLE_CONFIGS;
+    }
+
+    /**
+     * Validate the new cleaner threads num is reasonable.
+     *
+     * @param newConfig A submitted new AbstractConfig instance that contains 
new cleaner config
+     */
+    @Override
+    public void validateReconfiguration(AbstractConfig newConfig) {
+        int numThreads = new CleanerConfig(newConfig).numThreads;
+        int currentThreads = config.numThreads;
+        if (numThreads < 1)
+            throw new ConfigException("Log cleaner threads should be at least 
1");
+        if (numThreads < currentThreads / 2)
+            throw new ConfigException("Log cleaner threads cannot be reduced 
to less than half the current value " + currentThreads);
+        if (numThreads > currentThreads * 2)
+            throw new ConfigException("Log cleaner threads cannot be increased 
to more than double the current value " + currentThreads);
+
+    }
+
+    /**
+     * Reconfigure log clean config. The will:
+     * <ol>
+     *   <li>update desiredRatePerSec in Throttler with 
logCleanerIoMaxBytesPerSecond, if necessary</li>
+     *   <li>stop current log cleaners and create new ones.</li>
+     * </ol>
+     * That ensures that if any of the cleaners had failed, new cleaners are 
created to match the new config.
+     *
+     * @param oldConfig the old log cleaner config
+     * @param newConfig the new log cleaner config reconfigured
+     */
+    @Override
+    public void reconfigure(AbstractConfig oldConfig, AbstractConfig 
newConfig) {
+        config = new CleanerConfig(newConfig);
+
+        double maxIoBytesPerSecond = config.maxIoBytesPerSecond;
+        if (maxIoBytesPerSecond != 
oldConfig.getDouble(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP)) {
+            LOG.info("Updating logCleanerIoMaxBytesPerSecond: {}", 
maxIoBytesPerSecond);
+            throttler.updateDesiredRatePerSec(maxIoBytesPerSecond);
+        }
+        // call shutdownCleaners() instead of shutdown to avoid unnecessary 
deletion of metrics
+        shutdownCleaners();
+        startup();
+    }
+
+    /**
+     *  Abort the cleaning of a particular partition, if it's in progress. 
This call blocks until the cleaning of
+     *  the partition is aborted.
+     *
+     *  @param topicPartition The topic and partition to abort cleaning
+     */
+    public void abortCleaning(TopicPartition topicPartition) {
+        cleanerManager.abortCleaning(topicPartition);
+    }
+
+    /**
+     * Update checkpoint file to remove partitions if necessary.
+     *
+     * @param dataDir The data dir to be updated if necessary
+     * @param partitionToRemove The topicPartition to be removed
+     */
+    public void updateCheckpoints(File dataDir, Optional<TopicPartition> 
partitionToRemove) {
+        cleanerManager.updateCheckpoints(dataDir, Optional.empty(), 
partitionToRemove);
+    }
+
+    /**
+     * Alter the checkpoint directory for the `topicPartition`, to remove the 
data in `sourceLogDir`, and add the data in `destLogDir`.
+     * Generally occurs when the disk balance ends and replaces the previous 
file with the future file.
+     *
+     * @param topicPartition The topic and partition to alter checkpoint
+     * @param sourceLogDir The source log dir to remove checkpoint
+     * @param destLogDir The dest log dir to remove checkpoint
+     */
+    public void alterCheckpointDir(TopicPartition topicPartition, File 
sourceLogDir, File destLogDir) {
+        cleanerManager.alterCheckpointDir(topicPartition, sourceLogDir, 
destLogDir);
+    }
+
+    /**
+     * Stop cleaning logs in the provided directory when handling log dir 
failure.
+     *
+     * @param dir     the absolute path of the log dir
+     */
+    public void handleLogDirFailure(String dir) {
+        cleanerManager.handleLogDirFailure(dir);
+    }
+
+    /**
+     * Truncate cleaner offset checkpoint for the given partition if its 
checkpoint offset is larger than the given offset.
+     *
+     * @param dataDir The data dir to be truncated if necessary
+     * @param topicPartition The topic and partition to truncate checkpoint 
offset
+     * @param offset The given offset to be compared
+     */
+    public void maybeTruncateCheckpoint(File dataDir, TopicPartition 
topicPartition, long offset) {
+        cleanerManager.maybeTruncateCheckpoint(dataDir, topicPartition, 
offset);
+    }
+
+    /**
+     *  Abort the cleaning of a particular partition if it's in progress, and 
pause any future cleaning of this partition.
+     *  This call blocks until the cleaning of the partition is aborted and 
paused.
+     *
+     *  @param topicPartition The topic and partition to abort and pause 
cleaning
+     */
+    public void abortAndPauseCleaning(TopicPartition topicPartition) {
+        cleanerManager.abortAndPauseCleaning(topicPartition);
+    }
+
+    /**
+     *  Resume the cleaning of paused partitions.
+     *
+     *  @param topicPartitions The collection of topicPartitions to be resumed 
cleaning
+     */
+    public void resumeCleaning(Set<TopicPartition> topicPartitions) {
+        cleanerManager.resumeCleaning(topicPartitions);
+    }
+
+    /**
+     * For testing, a way to know when work has completed. This method waits 
until the
+     * cleaner has processed up to the given offset on the specified 
topic/partition.
+     *
+     * @param topicPartition The topic and partition to be cleaned
+     * @param offset The first dirty offset that the cleaner doesn't have to 
clean
+     * @param maxWaitMs The maximum time in ms to wait for cleaner
+     *
+     * @return A boolean indicating whether the work has completed before 
timeout
+     */
+    public boolean awaitCleaned(TopicPartition topicPartition, long offset, 
long maxWaitMs) throws InterruptedException {
+        long remainingWaitMs = maxWaitMs;
+        while (!isCleaned(topicPartition, offset) && remainingWaitMs > 0) {
+            long sleepTime = Math.min(100, remainingWaitMs);
+            Thread.sleep(sleepTime);
+            remainingWaitMs -= sleepTime;
+        }
+        return isCleaned(topicPartition, offset);
+    }
+
+    private boolean isCleaned(TopicPartition topicPartition, long offset) {
+        return 
Optional.ofNullable(cleanerManager.allCleanerCheckpoints().get(topicPartition))
+                .map(checkpoint -> checkpoint >= offset)
+                .orElse(false);
+    }
+
+    /**
+     * To prevent race between retention and compaction,
+     * retention threads need to make this call to obtain:
+     *
+     * @return A map of log partitions that retention threads can safely work 
on
+     */
+    public Map<TopicPartition, UnifiedLog> 
pauseCleaningForNonCompactedPartitions() {
+        return cleanerManager.pauseCleaningForNonCompactedPartitions().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
+    /**
+     * @param f to compute the result
+     * @return the max value or 0 if there is no cleaner
+     */
+    public double maxOverCleanerThreads(Function<CleanerThread, Double> f) {
+        return cleaners.stream()
+                .mapToDouble(f::apply)
+                .max()
+                .orElse(0.0d);
+    }
+
+    public int deadThreadCount() {
+        return (int) cleaners.stream()
+                .filter(ShutdownableThread::isThreadFailed)
+                .count();
+    }
+
+    // Only for testing
+    public LogCleanerManager cleanerManager() {
+        return cleanerManager;
+    }
+
+    // Only for testing
+    public Throttler throttler() {
+        return throttler;
+    }
+
+    // Only for testing
+    public ConcurrentMap<TopicPartition, UnifiedLog> logs() {
+        return logs;
+    }
+
+    // Only for testing
+    public List<CleanerThread> cleaners() {
+        return cleaners;
+    }
+
+    // Only for testing
+    public KafkaMetricsGroup metricsGroup() {
+        return metricsGroup;
+    }
+
+    // Only for testing
+    public CleanerConfig currentConfig() {
+        return config;
+    }
+
+    // Only for testing
+    public int cleanerCount() {
+        return cleaners.size();
+    }
+
+    /**
+     * The cleaner threads do the actual log cleaning. Each thread processes 
does its cleaning repeatedly by
+     * choosing the dirtiest log, cleaning it, and then swapping in the 
cleaned segments.
+     */
+    public class CleanerThread extends ShutdownableThread {
+        private final Logger logger = new 
LogContext(logPrefix).logger(CleanerThread.class);
+
+        private final Cleaner cleaner;
+
+        private volatile CleanerStats lastStats = new 
CleanerStats(Time.SYSTEM);
+        private volatile PreCleanStats lastPreCleanStats = new PreCleanStats();
+
+        @SuppressWarnings("this-escape")
+        public CleanerThread(int threadId) throws NoSuchAlgorithmException {
+            super("kafka-log-cleaner-thread-" + threadId, false);
+
+            cleaner = new Cleaner(
+                    threadId,
+                    new SkimpyOffsetMap((int) Math.min(config.dedupeBufferSize 
/ config.numThreads, Integer.MAX_VALUE), config.hashAlgorithm()),
+                    config.ioBufferSize / config.numThreads / 2,
+                    config.maxMessageSize,
+                    config.dedupeBufferLoadFactor,
+                    throttler,
+                    time,
+                    this::checkDone
+            );
+
+            if (config.dedupeBufferSize / config.numThreads > 
Integer.MAX_VALUE) {
+                logger.warn("Cannot use more than 2G of cleaner buffer space 
per cleaner thread, ignoring excess buffer space...");
+            }
+        }
+
+        /**
+         *  Check if the cleaning for a partition is aborted. If so, throw an 
exception.
+         *
+         *  @param topicPartition The topic and partition to check
+         */
+        private void checkDone(TopicPartition topicPartition) {
+            if (!isRunning()) {
+                throw new ThreadShutdownException();
+            }
+
+            cleanerManager.checkCleaningAborted(topicPartition);
+        }
+
+        /**
+         * The main loop for the cleaner thread.
+         * Clean a log if there is a dirty log available, otherwise sleep for 
a bit.
+         */
+        @Override
+        public void doWork() {
+            boolean cleaned = tryCleanFilthiestLog();
+            if (!cleaned) {
+                try {
+                    pause(config.backoffMs, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            cleanerManager.maintainUncleanablePartitions();
+        }
+
+        public CleanerStats lastStats() {
+            return lastStats;
+        }
+
+        public void setLastStats(CleanerStats lastStats) {
+            this.lastStats = lastStats;
+        }
+
+        public PreCleanStats lastPreCleanStats() {
+            return lastPreCleanStats;
+        }
+
+        /**
+         * Cleans a log if there is a dirty log available.
+         *
+         * @return whether a log was cleaned
+         */
+        private boolean tryCleanFilthiestLog() {
+            try {
+                return cleanFilthiestLog();
+            } catch (LogCleaningException e) {
+                logger.warn("Unexpected exception thrown when cleaning log {}. 
Marking its partition ({}) as uncleanable", e.log, e.log.topicPartition(), e);
+                cleanerManager.markPartitionUncleanable(e.log.parentDir(), 
e.log.topicPartition());
+
+                return false;
+            }
+        }
+
+        private boolean cleanFilthiestLog() throws LogCleaningException {
+            PreCleanStats preCleanStats = new PreCleanStats();
+            Optional<LogToClean> ltc = 
cleanerManager.grabFilthiestCompactedLog(time, preCleanStats);
+            boolean cleaned;
+
+            if (ltc.isEmpty()) {
+                cleaned = false;
+            } else {
+                // there's a log, clean it
+                this.lastPreCleanStats = preCleanStats;
+                LogToClean cleanable = null;
+                try {
+                    cleanable = ltc.get();
+                    cleanLog(cleanable);
+                    cleaned = true;
+                } catch (ThreadShutdownException e) {
+                    throw e;
+                } catch (Exception e) {
+                    throw new LogCleaningException(cleanable != null ? 
cleanable.log() : null, e.getMessage(), e);

Review Comment:
   According to the usage of `LogCleaningException#log`, the `log` can't be 
null.
   ```java
               try {
                   return cleanFilthiestLog();
               } catch (LogCleaningException e) {
                   logger.warn("Unexpected exception thrown when cleaning log 
{}. Marking its partition ({}) as uncleanable", e.log, e.log.topicPartition(), 
e);
                   cleanerManager.markPartitionUncleanable(e.log.parentDir(), 
e.log.topicPartition());
   
                   return false;
               }
   ```
   
   Additionally, in the path of handling exception, the `log` is impossible to 
be null. hence, we can revise the code.
   ```java
                   LogToClean cleanable = ltc.get();
                   try {
                       cleanLog(cleanable);
                       cleaned = true;
                   } catch (ThreadShutdownException e) {
                       throw e;
                   } catch (Exception e) {
                       throw new LogCleaningException(cleanable.log(), 
e.getMessage(), e);
                   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to