chia7712 commented on code in PR #19387:
URL: https://github.com/apache/kafka/pull/19387#discussion_r3178914412


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java:
##########
@@ -0,0 +1,766 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.storage.internals.utils.Throttler;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.DigestException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * This class holds the actual logic for cleaning a log.
+ */
+public class Cleaner {
+    private final BufferSupplier decompressionBufferSupplier = 
BufferSupplier.create();
+
+    private final Logger logger;
+    private final int id;
+    private final OffsetMap offsetMap;
+    private final int ioBufferSize;
+    private final int maxIoBufferSize;
+    private final double dupBufferLoadFactor;
+    private final Throttler throttler;
+    private final Time time;
+    private final Consumer<TopicPartition> checkDone;
+
+    /**
+     * Buffer used for read i/o
+     */
+    private ByteBuffer readBuffer;
+
+    /**
+     * Buffer used for write i/o
+     */
+    private ByteBuffer writeBuffer;
+
+    /**
+     *
+     * @param id An identifier used for logging
+     * @param offsetMap The map used for deduplication
+     * @param ioBufferSize The size of the buffers to use. Memory usage will 
be 2x this number as there is a read and write buffer.
+     * @param maxIoBufferSize The maximum size of a message that can appear in 
the log
+     * @param dupBufferLoadFactor The maximum percent full for the 
deduplication buffer
+     * @param throttler The throttler instance to use for limiting I/O rate
+     * @param time The time instance
+     * @param checkDone Check if the cleaning for a partition is finished or 
aborted
+     */
+    public Cleaner(int id,
+                   OffsetMap offsetMap,
+                   int ioBufferSize,
+                   int maxIoBufferSize,
+                   double dupBufferLoadFactor,
+                   Throttler throttler,
+                   Time time,
+                   Consumer<TopicPartition> checkDone) {
+        this.id = id;
+        this.offsetMap = offsetMap;
+        this.ioBufferSize = ioBufferSize;
+        this.maxIoBufferSize = maxIoBufferSize;
+        this.dupBufferLoadFactor = dupBufferLoadFactor;
+        this.throttler = throttler;
+        this.time = time;
+        this.checkDone = checkDone;
+        logger = new LogContext("Cleaner " + id + ": ").logger(Cleaner.class);
+
+        readBuffer = ByteBuffer.allocate(ioBufferSize);
+        writeBuffer = ByteBuffer.allocate(ioBufferSize);
+
+        assert offsetMap.slots() * dupBufferLoadFactor > 1 :
+                "offset map is too small to fit in even a single message, so 
log cleaning will never make progress. " +
+                        "You can increase log.cleaner.dedupe.buffer.size or 
decrease log.cleaner.threads";
+    }
+
+    public int id() {
+        return id;
+    }
+
+    // Only for testing
+    public OffsetMap offsetMap() {
+        return offsetMap;
+    }
+
+    /**
+     * Clean the given log.
+     *
+     * @param cleanable The log to be cleaned
+     *
+     * @return The first offset not cleaned and the statistics for this round 
of cleaning
+     */
+    public Map.Entry<Long, CleanerStats> clean(LogToClean cleanable) throws 
IOException, DigestException {
+        return doClean(cleanable, time.milliseconds());
+    }
+
+    /**
+     * Clean the given log.
+     *
+     * @param cleanable The log to be cleaned
+     * @param currentTime The current timestamp for doing cleaning
+     *
+     * @return The first offset not cleaned and the statistics for this round 
of cleaning
+     * */
+    public Map.Entry<Long, CleanerStats> doClean(LogToClean cleanable, long 
currentTime) throws IOException, DigestException {
+        UnifiedLog log = cleanable.log();
+
+        logger.info("Beginning cleaning of log {}", log.name());
+
+        // figure out the timestamp below which it is safe to remove delete 
tombstones
+        // this position is defined to be a configurable time beneath the last 
modified time of the last clean segment
+        // this timestamp is only used on the older message formats older than 
MAGIC_VALUE_V2
+        List<LogSegment> segments = log.logSegments(0, 
cleanable.firstDirtyOffset());
+        long legacyDeleteHorizonMs = segments.isEmpty()
+                ? 0L
+                : segments.get(segments.size() - 1).lastModified() - 
log.config().deleteRetentionMs;
+
+        CleanerStats stats = new CleanerStats(Time.SYSTEM);
+
+        // build the offset map
+        logger.info("Building offset map for {}...", log.name());
+        long upperBoundOffset = cleanable.firstUncleanableOffset();
+        buildOffsetMap(log, cleanable.firstDirtyOffset(), upperBoundOffset, 
offsetMap, stats);
+        long endOffset = offsetMap.latestOffset() + 1;
+        stats.indexDone();
+
+        // determine the timestamp up to which the log will be cleaned
+        // this is the lower of the last active segment and the compaction lag
+        segments = log.logSegments(0, cleanable.firstUncleanableOffset());
+        long cleanableHorizonMs = segments.isEmpty()
+                ? 0L
+                : segments.get(segments.size() - 1).lastModified();
+
+        // group the segments and clean the groups
+        logger.info("Cleaning log {} (cleaning prior to {}, discarding 
tombstones prior to upper bound deletion horizon {})...",
+                log.name(), new Date(cleanableHorizonMs), new 
Date(legacyDeleteHorizonMs));
+        CleanedTransactionMetadata transactionMetadata = new 
CleanedTransactionMetadata();
+
+        List<List<LogSegment>> groupedSegments = groupSegmentsBySize(
+                log.logSegments(0, endOffset),
+                log.config().segmentSize,
+                log.config().maxIndexSize,
+                cleanable.firstUncleanableOffset()
+        );
+
+        for (List<LogSegment> group : groupedSegments) {
+            cleanSegments(log, group, offsetMap, currentTime, stats, 
transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset);
+        }
+
+        // record buffer utilization
+        stats.bufferUtilization = offsetMap.utilization();
+
+        stats.allDone();
+
+        return Map.entry(endOffset, stats);
+    }
+
+    /**
+     * Clean a group of segments into a single replacement segment.
+     *
+     * @param log The log being cleaned
+     * @param segments The group of segments being cleaned
+     * @param map The offset map to use for cleaning segments
+     * @param currentTime The current time in milliseconds
+     * @param stats Collector for cleaning statistics
+     * @param transactionMetadata State of ongoing transactions which is 
carried between the cleaning
+     *                            of the grouped segments
+     * @param legacyDeleteHorizonMs The delete horizon used for tombstones 
whose version is less than 2
+     * @param upperBoundOffsetOfCleaningRound The upper bound offset of this 
round of cleaning
+     */
+    @SuppressWarnings("finally")
+    public void cleanSegments(UnifiedLog log,
+                               List<LogSegment> segments,
+                               OffsetMap map,
+                               long currentTime,
+                               CleanerStats stats,
+                               CleanedTransactionMetadata transactionMetadata,
+                               long legacyDeleteHorizonMs,
+                               long upperBoundOffsetOfCleaningRound) throws 
IOException {
+        // create a new segment with a suffix appended to the name of the log 
and indexes
+        LogSegment cleaned = UnifiedLog.createNewCleanedSegment(log.dir(), 
log.config(), segments.get(0).baseOffset());
+        transactionMetadata.setCleanedIndex(Optional.of(cleaned.txnIndex()));
+
+        try {
+            // clean segments into the new destination segment
+            Iterator<LogSegment> iter = segments.iterator();
+            Optional<LogSegment> currentSegmentOpt = Optional.of(iter.next());
+            Map<Long, LastRecord> lastOffsetOfActiveProducers = 
log.lastRecordsOfActiveProducers();
+
+            while (currentSegmentOpt.isPresent()) {
+                LogSegment currentSegment = currentSegmentOpt.get();
+                Optional<LogSegment> nextSegmentOpt = iter.hasNext() ? 
Optional.of(iter.next()) : Optional.empty();
+
+                // Note that it is important to collect aborted transactions 
from the full log segment
+                // range since we need to rebuild the full transaction index 
for the new segment.
+                long startOffset = currentSegment.baseOffset();
+                long upperBoundOffset = 
nextSegmentOpt.map(LogSegment::baseOffset).orElse(currentSegment.readNextOffset());

Review Comment:
   We missed the lazy evaluation here, which causes each segment to seek to the 
next offset. While it's not a major regression, it's definitely worth fixing. 
See #22200 for details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to