chia7712 commented on code in PR #19387: URL: https://github.com/apache/kafka/pull/19387#discussion_r3178914412
########## storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java: ########## @@ -0,0 +1,766 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.utils.Throttler; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.DigestException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; + +/** + * This class holds the actual logic for cleaning a log. + */ +public class Cleaner { + private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create(); + + private final Logger logger; + private final int id; + private final OffsetMap offsetMap; + private final int ioBufferSize; + private final int maxIoBufferSize; + private final double dupBufferLoadFactor; + private final Throttler throttler; + private final Time time; + private final Consumer<TopicPartition> checkDone; + + /** + * Buffer used for read i/o + */ + private ByteBuffer readBuffer; + + /** + * Buffer used for write i/o + */ + private ByteBuffer writeBuffer; + + /** + * + * @param id An identifier used for logging + * @param offsetMap The map used for deduplication + * @param ioBufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer. + * @param maxIoBufferSize The maximum size of a message that can appear in the log + * @param dupBufferLoadFactor The maximum percent full for the deduplication buffer + * @param throttler The throttler instance to use for limiting I/O rate + * @param time The time instance + * @param checkDone Check if the cleaning for a partition is finished or aborted + */ + public Cleaner(int id, + OffsetMap offsetMap, + int ioBufferSize, + int maxIoBufferSize, + double dupBufferLoadFactor, + Throttler throttler, + Time time, + Consumer<TopicPartition> checkDone) { + this.id = id; + this.offsetMap = offsetMap; + this.ioBufferSize = ioBufferSize; + this.maxIoBufferSize = maxIoBufferSize; + this.dupBufferLoadFactor = dupBufferLoadFactor; + this.throttler = throttler; + this.time = time; + this.checkDone = checkDone; + logger = new LogContext("Cleaner " + id + ": ").logger(Cleaner.class); + + readBuffer = ByteBuffer.allocate(ioBufferSize); + writeBuffer = ByteBuffer.allocate(ioBufferSize); + + assert offsetMap.slots() * dupBufferLoadFactor > 1 : + "offset map is too small to fit in even a single message, so log cleaning will never make progress. " + + "You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads"; + } + + public int id() { + return id; + } + + // Only for testing + public OffsetMap offsetMap() { + return offsetMap; + } + + /** + * Clean the given log. + * + * @param cleanable The log to be cleaned + * + * @return The first offset not cleaned and the statistics for this round of cleaning + */ + public Map.Entry<Long, CleanerStats> clean(LogToClean cleanable) throws IOException, DigestException { + return doClean(cleanable, time.milliseconds()); + } + + /** + * Clean the given log. + * + * @param cleanable The log to be cleaned + * @param currentTime The current timestamp for doing cleaning + * + * @return The first offset not cleaned and the statistics for this round of cleaning + * */ + public Map.Entry<Long, CleanerStats> doClean(LogToClean cleanable, long currentTime) throws IOException, DigestException { + UnifiedLog log = cleanable.log(); + + logger.info("Beginning cleaning of log {}", log.name()); + + // figure out the timestamp below which it is safe to remove delete tombstones + // this position is defined to be a configurable time beneath the last modified time of the last clean segment + // this timestamp is only used on the older message formats older than MAGIC_VALUE_V2 + List<LogSegment> segments = log.logSegments(0, cleanable.firstDirtyOffset()); + long legacyDeleteHorizonMs = segments.isEmpty() + ? 0L + : segments.get(segments.size() - 1).lastModified() - log.config().deleteRetentionMs; + + CleanerStats stats = new CleanerStats(Time.SYSTEM); + + // build the offset map + logger.info("Building offset map for {}...", log.name()); + long upperBoundOffset = cleanable.firstUncleanableOffset(); + buildOffsetMap(log, cleanable.firstDirtyOffset(), upperBoundOffset, offsetMap, stats); + long endOffset = offsetMap.latestOffset() + 1; + stats.indexDone(); + + // determine the timestamp up to which the log will be cleaned + // this is the lower of the last active segment and the compaction lag + segments = log.logSegments(0, cleanable.firstUncleanableOffset()); + long cleanableHorizonMs = segments.isEmpty() + ? 0L + : segments.get(segments.size() - 1).lastModified(); + + // group the segments and clean the groups + logger.info("Cleaning log {} (cleaning prior to {}, discarding tombstones prior to upper bound deletion horizon {})...", + log.name(), new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs)); + CleanedTransactionMetadata transactionMetadata = new CleanedTransactionMetadata(); + + List<List<LogSegment>> groupedSegments = groupSegmentsBySize( + log.logSegments(0, endOffset), + log.config().segmentSize, + log.config().maxIndexSize, + cleanable.firstUncleanableOffset() + ); + + for (List<LogSegment> group : groupedSegments) { + cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset); + } + + // record buffer utilization + stats.bufferUtilization = offsetMap.utilization(); + + stats.allDone(); + + return Map.entry(endOffset, stats); + } + + /** + * Clean a group of segments into a single replacement segment. + * + * @param log The log being cleaned + * @param segments The group of segments being cleaned + * @param map The offset map to use for cleaning segments + * @param currentTime The current time in milliseconds + * @param stats Collector for cleaning statistics + * @param transactionMetadata State of ongoing transactions which is carried between the cleaning + * of the grouped segments + * @param legacyDeleteHorizonMs The delete horizon used for tombstones whose version is less than 2 + * @param upperBoundOffsetOfCleaningRound The upper bound offset of this round of cleaning + */ + @SuppressWarnings("finally") + public void cleanSegments(UnifiedLog log, + List<LogSegment> segments, + OffsetMap map, + long currentTime, + CleanerStats stats, + CleanedTransactionMetadata transactionMetadata, + long legacyDeleteHorizonMs, + long upperBoundOffsetOfCleaningRound) throws IOException { + // create a new segment with a suffix appended to the name of the log and indexes + LogSegment cleaned = UnifiedLog.createNewCleanedSegment(log.dir(), log.config(), segments.get(0).baseOffset()); + transactionMetadata.setCleanedIndex(Optional.of(cleaned.txnIndex())); + + try { + // clean segments into the new destination segment + Iterator<LogSegment> iter = segments.iterator(); + Optional<LogSegment> currentSegmentOpt = Optional.of(iter.next()); + Map<Long, LastRecord> lastOffsetOfActiveProducers = log.lastRecordsOfActiveProducers(); + + while (currentSegmentOpt.isPresent()) { + LogSegment currentSegment = currentSegmentOpt.get(); + Optional<LogSegment> nextSegmentOpt = iter.hasNext() ? Optional.of(iter.next()) : Optional.empty(); + + // Note that it is important to collect aborted transactions from the full log segment + // range since we need to rebuild the full transaction index for the new segment. + long startOffset = currentSegment.baseOffset(); + long upperBoundOffset = nextSegmentOpt.map(LogSegment::baseOffset).orElse(currentSegment.readNextOffset()); Review Comment: We missed the lazy evaluation here, which causes each segment to seek to the next offset. While it's not a major regression, it's definitely worth fixing. See #22200 for details. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
