wernerdv commented on code in PR #19387:
URL: https://github.com/apache/kafka/pull/19387#discussion_r2038343035


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java:
##########
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.storage.internals.utils.Throttler;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.DigestException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+/**
+ * This class holds the actual logic for cleaning a log.
+ */
+public class Cleaner {
+    private final BufferSupplier decompressionBufferSupplier = 
BufferSupplier.create();
+
+    private final Logger logger;
+    private final int id;
+    private final OffsetMap offsetMap;
+    private final int ioBufferSize;
+    private final int maxIoBufferSize;
+    private final double dupBufferLoadFactor;
+    private final Throttler throttler;
+    private final Time time;
+    private final Consumer<TopicPartition> checkDone;
+
+    /**
+     * Buffer used for read i/o
+     */
+    private ByteBuffer readBuffer;
+
+    /**
+     * Buffer used for write i/o
+     */
+    private ByteBuffer writeBuffer;
+
+    /**
+     *
+     * @param id An identifier used for logging
+     * @param offsetMap The map used for deduplication
+     * @param ioBufferSize The size of the buffers to use. Memory usage will 
be 2x this number as there is a read and write buffer.
+     * @param maxIoBufferSize The maximum size of a message that can appear in 
the log
+     * @param dupBufferLoadFactor The maximum percent full for the 
deduplication buffer
+     * @param throttler The throttler instance to use for limiting I/O rate
+     * @param time The time instance
+     * @param checkDone Check if the cleaning for a partition is finished or 
aborted
+     */
+    public Cleaner(int id,
+                   OffsetMap offsetMap,
+                   int ioBufferSize,
+                   int maxIoBufferSize,
+                   double dupBufferLoadFactor,
+                   Throttler throttler,
+                   Time time,
+                   Consumer<TopicPartition> checkDone) {
+        this.id = id;
+        this.offsetMap = offsetMap;
+        this.ioBufferSize = ioBufferSize;
+        this.maxIoBufferSize = maxIoBufferSize;
+        this.dupBufferLoadFactor = dupBufferLoadFactor;
+        this.throttler = throttler;
+        this.time = time;
+        this.checkDone = checkDone;
+        logger = new LogContext("Cleaner " + id + ": 
").logger(LogCleaner.class);
+
+        readBuffer = ByteBuffer.allocate(ioBufferSize);
+        writeBuffer = ByteBuffer.allocate(ioBufferSize);
+
+        assert offsetMap.slots() * dupBufferLoadFactor > 1 :
+                "offset map is too small to fit in even a single message, so 
log cleaning will never make progress. " +
+                        "You can increase log.cleaner.dedupe.buffer.size or 
decrease log.cleaner.threads";
+    }
+
+    public int id() {
+        return id;
+    }
+
+    // Only for testing
+    public OffsetMap offsetMap() {
+        return offsetMap;
+    }
+
+    /**
+     * Clean the given log.
+     *
+     * @param cleanable The log to be cleaned
+     *
+     * @return The first offset not cleaned and the statistics for this round 
of cleaning
+     */
+    public Map.Entry<Long, CleanerStats> clean(LogToClean cleanable) throws 
IOException, DigestException {
+        return doClean(cleanable, time.milliseconds());
+    }
+
+    /**
+     * Clean the given log.
+     *
+     * @param cleanable The log to be cleaned
+     * @param currentTime The current timestamp for doing cleaning
+     *
+     * @return The first offset not cleaned and the statistics for this round 
of cleaning
+     * */
+    public Map.Entry<Long, CleanerStats> doClean(LogToClean cleanable, long 
currentTime) throws IOException, DigestException {
+        UnifiedLog log = cleanable.log();
+
+        logger.info("Beginning cleaning of log {}", log.name());
+
+        // figure out the timestamp below which it is safe to remove delete 
tombstones
+        // this position is defined to be a configurable time beneath the last 
modified time of the last clean segment
+        // this timestamp is only used on the older message formats older than 
MAGIC_VALUE_V2
+        List<LogSegment> segments = log.logSegments(0, 
cleanable.firstDirtyOffset());
+        long legacyDeleteHorizonMs = segments.isEmpty()
+                ? 0L
+                : segments.get(segments.size() - 1).lastModified() - 
log.config().deleteRetentionMs;
+
+        CleanerStats stats = new CleanerStats(Time.SYSTEM);
+
+        // build the offset map
+        logger.info("Building offset map for {}...", log.name());
+        long upperBoundOffset = cleanable.firstUncleanableOffset();
+        buildOffsetMap(log, cleanable.firstDirtyOffset(), upperBoundOffset, 
offsetMap, stats);
+        long endOffset = offsetMap.latestOffset() + 1;
+        stats.indexDone();
+
+        // determine the timestamp up to which the log will be cleaned
+        // this is the lower of the last active segment and the compaction lag
+        long cleanableHorizonMs = log.logSegments(0, 
cleanable.firstUncleanableOffset()).stream()
+                .reduce((first, second) -> second)
+                .map(LogSegment::lastModified)
+                .orElse(0L);

Review Comment:
   Fixed



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java:
##########
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.storage.internals.utils.Throttler;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.DigestException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+/**
+ * This class holds the actual logic for cleaning a log.
+ */
+public class Cleaner {
+    private final BufferSupplier decompressionBufferSupplier = 
BufferSupplier.create();
+
+    private final Logger logger;
+    private final int id;
+    private final OffsetMap offsetMap;
+    private final int ioBufferSize;
+    private final int maxIoBufferSize;
+    private final double dupBufferLoadFactor;
+    private final Throttler throttler;
+    private final Time time;
+    private final Consumer<TopicPartition> checkDone;
+
+    /**
+     * Buffer used for read i/o
+     */
+    private ByteBuffer readBuffer;
+
+    /**
+     * Buffer used for write i/o
+     */
+    private ByteBuffer writeBuffer;
+
+    /**
+     *
+     * @param id An identifier used for logging
+     * @param offsetMap The map used for deduplication
+     * @param ioBufferSize The size of the buffers to use. Memory usage will 
be 2x this number as there is a read and write buffer.
+     * @param maxIoBufferSize The maximum size of a message that can appear in 
the log
+     * @param dupBufferLoadFactor The maximum percent full for the 
deduplication buffer
+     * @param throttler The throttler instance to use for limiting I/O rate
+     * @param time The time instance
+     * @param checkDone Check if the cleaning for a partition is finished or 
aborted
+     */
+    public Cleaner(int id,
+                   OffsetMap offsetMap,
+                   int ioBufferSize,
+                   int maxIoBufferSize,
+                   double dupBufferLoadFactor,
+                   Throttler throttler,
+                   Time time,
+                   Consumer<TopicPartition> checkDone) {
+        this.id = id;
+        this.offsetMap = offsetMap;
+        this.ioBufferSize = ioBufferSize;
+        this.maxIoBufferSize = maxIoBufferSize;
+        this.dupBufferLoadFactor = dupBufferLoadFactor;
+        this.throttler = throttler;
+        this.time = time;
+        this.checkDone = checkDone;
+        logger = new LogContext("Cleaner " + id + ": 
").logger(LogCleaner.class);
+
+        readBuffer = ByteBuffer.allocate(ioBufferSize);
+        writeBuffer = ByteBuffer.allocate(ioBufferSize);
+
+        assert offsetMap.slots() * dupBufferLoadFactor > 1 :
+                "offset map is too small to fit in even a single message, so 
log cleaning will never make progress. " +
+                        "You can increase log.cleaner.dedupe.buffer.size or 
decrease log.cleaner.threads";
+    }
+
+    public int id() {
+        return id;
+    }
+
+    // Only for testing
+    public OffsetMap offsetMap() {
+        return offsetMap;
+    }
+
+    /**
+     * Clean the given log.
+     *
+     * @param cleanable The log to be cleaned
+     *
+     * @return The first offset not cleaned and the statistics for this round 
of cleaning
+     */
+    public Map.Entry<Long, CleanerStats> clean(LogToClean cleanable) throws 
IOException, DigestException {
+        return doClean(cleanable, time.milliseconds());
+    }
+
+    /**
+     * Clean the given log.
+     *
+     * @param cleanable The log to be cleaned
+     * @param currentTime The current timestamp for doing cleaning
+     *
+     * @return The first offset not cleaned and the statistics for this round 
of cleaning
+     * */
+    public Map.Entry<Long, CleanerStats> doClean(LogToClean cleanable, long 
currentTime) throws IOException, DigestException {
+        UnifiedLog log = cleanable.log();
+
+        logger.info("Beginning cleaning of log {}", log.name());
+
+        // figure out the timestamp below which it is safe to remove delete 
tombstones
+        // this position is defined to be a configurable time beneath the last 
modified time of the last clean segment
+        // this timestamp is only used on the older message formats older than 
MAGIC_VALUE_V2
+        List<LogSegment> segments = log.logSegments(0, 
cleanable.firstDirtyOffset());
+        long legacyDeleteHorizonMs = segments.isEmpty()
+                ? 0L
+                : segments.get(segments.size() - 1).lastModified() - 
log.config().deleteRetentionMs;
+
+        CleanerStats stats = new CleanerStats(Time.SYSTEM);
+
+        // build the offset map
+        logger.info("Building offset map for {}...", log.name());
+        long upperBoundOffset = cleanable.firstUncleanableOffset();
+        buildOffsetMap(log, cleanable.firstDirtyOffset(), upperBoundOffset, 
offsetMap, stats);
+        long endOffset = offsetMap.latestOffset() + 1;
+        stats.indexDone();
+
+        // determine the timestamp up to which the log will be cleaned
+        // this is the lower of the last active segment and the compaction lag
+        long cleanableHorizonMs = log.logSegments(0, 
cleanable.firstUncleanableOffset()).stream()
+                .reduce((first, second) -> second)
+                .map(LogSegment::lastModified)
+                .orElse(0L);
+
+        // group the segments and clean the groups
+        logger.info("Cleaning log {} (cleaning prior to {}, discarding 
tombstones prior to upper bound deletion horizon {})...",
+                log.name(), new Date(cleanableHorizonMs), new 
Date(legacyDeleteHorizonMs));
+        CleanedTransactionMetadata transactionMetadata = new 
CleanedTransactionMetadata();
+
+        List<List<LogSegment>> groupedSegments = groupSegmentsBySize(
+                log.logSegments(0, endOffset),
+                log.config().segmentSize,
+                log.config().maxIndexSize,
+                cleanable.firstUncleanableOffset()
+        );
+
+        for (List<LogSegment> group : groupedSegments) {
+            cleanSegments(log, group, offsetMap, currentTime, stats, 
transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset);
+        }
+
+        // record buffer utilization
+        stats.bufferUtilization = offsetMap.utilization();
+
+        stats.allDone();
+
+        return Map.entry(endOffset, stats);
+    }
+
+    /**
+     * Clean a group of segments into a single replacement segment.
+     *
+     * @param log The log being cleaned
+     * @param segments The group of segments being cleaned
+     * @param map The offset map to use for cleaning segments
+     * @param currentTime The current time in milliseconds
+     * @param stats Collector for cleaning statistics
+     * @param transactionMetadata State of ongoing transactions which is 
carried between the cleaning
+     *                            of the grouped segments
+     * @param legacyDeleteHorizonMs The delete horizon used for tombstones 
whose version is less than 2
+     * @param upperBoundOffsetOfCleaningRound The upper bound offset of this 
round of cleaning
+     */
+    @SuppressWarnings("finally")
+    public void cleanSegments(UnifiedLog log,
+                               List<LogSegment> segments,
+                               OffsetMap map,
+                               long currentTime,
+                               CleanerStats stats,
+                               CleanedTransactionMetadata transactionMetadata,
+                               long legacyDeleteHorizonMs,
+                               long upperBoundOffsetOfCleaningRound) throws 
IOException {
+        // create a new segment with a suffix appended to the name of the log 
and indexes
+        LogSegment cleaned = UnifiedLog.createNewCleanedSegment(log.dir(), 
log.config(), segments.get(0).baseOffset());
+        transactionMetadata.setCleanedIndex(Optional.of(cleaned.txnIndex()));
+
+        try {
+            // clean segments into the new destination segment
+            Iterator<LogSegment> iter = segments.iterator();
+            Optional<LogSegment> currentSegmentOpt = Optional.of(iter.next());
+            Map<Long, LastRecord> lastOffsetOfActiveProducers = 
log.lastRecordsOfActiveProducers();
+
+            while (currentSegmentOpt.isPresent()) {
+                LogSegment currentSegment = currentSegmentOpt.get();
+                Optional<LogSegment> nextSegmentOpt = iter.hasNext() ? 
Optional.of(iter.next()) : Optional.empty();
+
+                // Note that it is important to collect aborted transactions 
from the full log segment
+                // range since we need to rebuild the full transaction index 
for the new segment.
+                long startOffset = currentSegment.baseOffset();
+                long upperBoundOffset = 
nextSegmentOpt.map(LogSegment::baseOffset).orElse(currentSegment.readNextOffset());
+                List<AbortedTxn> abortedTransactions = 
log.collectAbortedTransactions(startOffset, upperBoundOffset);
+                
transactionMetadata.addAbortedTransactions(abortedTransactions);
+
+                boolean retainLegacyDeletesAndTxnMarkers = 
currentSegment.lastModified() > legacyDeleteHorizonMs;
+                logger.info(
+                        "Cleaning {} in log {} into {} with an upper bound 
deletion horizon {} computed from " +
+                        "the segment last modified time of {},{} deletes.",
+                        currentSegment, log.name(), cleaned.baseOffset(), 
legacyDeleteHorizonMs, currentSegment.lastModified(),
+                        retainLegacyDeletesAndTxnMarkers ? "retaining" : 
"discarding"
+                );
+
+                try {
+                    cleanInto(
+                            log.topicPartition(),
+                            currentSegment.log(),
+                            cleaned,
+                            map,
+                            retainLegacyDeletesAndTxnMarkers,
+                            log.config().deleteRetentionMs,
+                            log.config().maxMessageSize(),
+                            transactionMetadata,
+                            lastOffsetOfActiveProducers,
+                            upperBoundOffsetOfCleaningRound,
+                            stats,
+                            currentTime
+                    );
+                } catch (LogSegmentOffsetOverflowException e) {
+                    // Split the current segment. It's also safest to abort 
the current cleaning process, so that we retry from
+                    // scratch once the split is complete.
+                    logger.info("Caught segment overflow error during 
cleaning: {}", e.getMessage());
+                    log.splitOverflowedSegment(currentSegment);
+                    throw new LogCleaningAbortedException();
+                }
+                currentSegmentOpt = nextSegmentOpt;
+            }
+
+            cleaned.onBecomeInactiveSegment();
+            // flush new segment to disk before swap
+            cleaned.flush();
+
+            // update the modification date to retain the last modified date 
of the original files
+            long modified = segments.get(segments.size() - 1).lastModified();
+            cleaned.setLastModified(modified);
+
+            // swap in new segment
+            logger.info("Swapping in cleaned segment {} for segment(s) {} in 
log {}", cleaned, segments, log);
+            log.replaceSegments(List.of(cleaned), segments);
+        } catch (LogCleaningAbortedException e) {
+            try {
+                cleaned.deleteIfExists();
+            } catch (Exception deleteException) {
+                e.addSuppressed(deleteException);
+            } finally {
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * Clean the given source log segment into the destination segment using 
the key=>offset mapping
+     * provided.
+     *
+     * @param topicPartition The topic and partition of the log segment to 
clean
+     * @param sourceRecords The dirty log segment
+     * @param dest The cleaned log segment
+     * @param map The key=>offset mapping
+     * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+     * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
+     * @param maxLogMessageSize The maximum message size of the corresponding 
topic
+     * @param transactionMetadata The state of ongoing transactions which is 
carried between the cleaning of the grouped segments
+     * @param lastRecordsOfActiveProducers The active producers and its last 
data offset
+     * @param upperBoundOffsetOfCleaningRound Next offset of the last batch in 
the source segment
+     * @param stats Collector for cleaning statistics
+     * @param currentTime The time at which the clean was initiated
+     */
+    private void cleanInto(TopicPartition topicPartition,
+                           FileRecords sourceRecords,
+                           LogSegment dest,
+                           OffsetMap map,
+                           boolean retainLegacyDeletesAndTxnMarkers,
+                           long deleteRetentionMs,
+                           int maxLogMessageSize,
+                           CleanedTransactionMetadata transactionMetadata,
+                           Map<Long, LastRecord> lastRecordsOfActiveProducers,
+                           long upperBoundOffsetOfCleaningRound,
+                           CleanerStats stats,
+                           long currentTime) throws IOException {
+        MemoryRecords.RecordFilter logCleanerFilter = new 
MemoryRecords.RecordFilter(currentTime, deleteRetentionMs) {
+            private boolean discardBatchRecords;
+
+            @Override
+            public BatchRetentionResult checkBatchRetention(RecordBatch batch) 
{
+                // we piggy-back on the tombstone retention logic to delay 
deletion of transaction markers.
+                // note that we will never delete a marker until all the 
records from that transaction are removed.
+                boolean canDiscardBatch = shouldDiscardBatch(batch, 
transactionMetadata);
+
+                if (batch.isControlBatch()) {
+                    discardBatchRecords = canDiscardBatch && 
batch.deleteHorizonMs().isPresent() && batch.deleteHorizonMs().getAsLong() <= 
this.currentTime;
+                } else {
+                    discardBatchRecords = canDiscardBatch;
+                }
+
+                // We retain the batch in order to preserve the state of 
active producers. There are three cases:
+                // 1) The producer is no longer active, which means we can 
delete all records for that producer.
+                // 2) The producer is still active and has a last data offset. 
We retain the batch that contains
+                //    this offset since it also contains the last sequence 
number for this producer.
+                // 3) The last entry in the log is a transaction marker. We 
retain this marker since it has the
+                //    last producer epoch, which is needed to ensure fencing.
+                boolean isBatchLastRecordOfProducer = 
Optional.ofNullable(lastRecordsOfActiveProducers.get(batch.producerId()))
+                        .map(lastRecord -> {
+                            if (lastRecord.lastDataOffset.isPresent()) {
+                                return batch.lastOffset() == 
lastRecord.lastDataOffset.getAsLong();
+                            } else {
+                                return batch.isControlBatch() && 
batch.producerEpoch() == lastRecord.producerEpoch;
+                            }
+                        })
+                        .orElse(false);
+
+                BatchRetention batchRetention;
+                if (batch.hasProducerId() && isBatchLastRecordOfProducer)
+                    batchRetention = BatchRetention.RETAIN_EMPTY;
+                else if (batch.nextOffset() == 
upperBoundOffsetOfCleaningRound) {
+                    // retain the last batch of the cleaning round, even if 
it's empty, so that last offset information
+                    // is not lost after cleaning.
+                    batchRetention = BatchRetention.RETAIN_EMPTY;
+                } else if (discardBatchRecords)
+                    batchRetention = BatchRetention.DELETE;
+                else
+                    batchRetention = BatchRetention.DELETE_EMPTY;
+
+                return new BatchRetentionResult(batchRetention, 
canDiscardBatch && batch.isControlBatch());
+            }
+
+            @Override
+            public boolean shouldRetainRecord(RecordBatch batch, Record 
record) {
+                if (discardBatchRecords) {
+                    // The batch is only retained to preserve producer 
sequence information; the records can be removed
+                    return false;
+                } else if (batch.isControlBatch()) {
+                    return true;
+                } else {
+                    try {
+                        return Cleaner.this.shouldRetainRecord(map, 
retainLegacyDeletesAndTxnMarkers, batch, record, stats, currentTime);
+                    } catch (DigestException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        };
+
+        int position = 0;
+        while (position < sourceRecords.sizeInBytes()) {
+            checkDone.accept(topicPartition);
+            // read a chunk of messages and copy any that are to be retained 
to the write buffer to be written out
+            readBuffer.clear();
+            writeBuffer.clear();
+
+            sourceRecords.readInto(readBuffer, position);
+            MemoryRecords records = MemoryRecords.readableRecords(readBuffer);
+            throttler.maybeThrottle(records.sizeInBytes());
+            MemoryRecords.FilterResult result = 
records.filterTo(logCleanerFilter, writeBuffer, decompressionBufferSupplier);
+
+            stats.readMessages(result.messagesRead(), result.bytesRead());
+            stats.recopyMessages(result.messagesRetained(), 
result.bytesRetained());
+
+            position += result.bytesRead();
+
+            // if any messages are to be retained, write them out
+            ByteBuffer outputBuffer = result.outputBuffer();
+            if (outputBuffer.position() > 0) {
+                outputBuffer.flip();
+                MemoryRecords retained = 
MemoryRecords.readableRecords(outputBuffer);
+                // it's OK not to hold the Log's lock in this case, because 
this segment is only accessed by other threads
+                // after `Log.replaceSegments` (which acquires the lock) is 
called
+                dest.append(result.maxOffset(), retained);
+                throttler.maybeThrottle(outputBuffer.limit());
+            }
+
+            // if we read bytes but didn't get even one complete batch, our 
I/O buffer is too small, grow it and try again
+            // `result.bytesRead` contains bytes from `messagesRead` and any 
discarded batches.
+            if (readBuffer.limit() > 0 && result.bytesRead() == 0)
+                growBuffersOrFail(sourceRecords, position, maxLogMessageSize, 
records);
+        }
+        restoreBuffers();
+    }
+
+
+    /**
+     * Grow buffers to process next batch of records from `sourceRecords.` 
Buffers are doubled in size
+     * up to a maximum of `maxLogMessageSize`. In some scenarios, a record 
could be bigger than the
+     * current maximum size configured for the log. For example:
+     * <ol>
+     *   <li>A compacted topic using compression may contain a message set 
slightly larger than max.message.bytes</li>
+     *   <li>max.message.bytes of a topic could have been reduced after 
writing larger messages</li>
+     * </ol>
+     * In these cases, grow the buffer to hold the next batch.
+     *
+     * @param sourceRecords The dirty log segment records to process
+     * @param position The current position in the read buffer to read from
+     * @param maxLogMessageSize The maximum record size in bytes for the topic
+     * @param memoryRecords The memory records in read buffer
+     */
+    private void growBuffersOrFail(FileRecords sourceRecords,
+                                   int position,
+                                   int maxLogMessageSize,
+                                   MemoryRecords memoryRecords) throws 
IOException {
+        int maxSize;
+
+        if (readBuffer.capacity() >= maxLogMessageSize) {
+            Integer nextBatchSize = memoryRecords.firstBatchSize();
+            String logDesc = String.format("log segment %s at position %d", 
sourceRecords.file(), position);
+            if (nextBatchSize == null) {
+                throw new IllegalStateException("Could not determine next 
batch size for " + logDesc);
+            }
+            if (nextBatchSize <= 0) {
+                throw new IllegalStateException("Invalid batch size " + 
nextBatchSize + " for " + logDesc);
+            }
+            if (nextBatchSize <= readBuffer.capacity()) {
+                throw new IllegalStateException("Batch size " + nextBatchSize 
+ " < buffer size " + readBuffer.capacity() + ", but not processed for " + 
logDesc);
+            }
+            long bytesLeft = sourceRecords.channel().size() - position;
+            if (nextBatchSize > bytesLeft) {
+                throw new CorruptRecordException("Log segment may be corrupt, 
batch size " + nextBatchSize + " > " + bytesLeft + " bytes left in segment for 
" + logDesc);
+            }
+
+            maxSize = nextBatchSize;
+        } else {
+            maxSize = maxLogMessageSize;
+        }
+
+        growBuffers(maxSize);
+    }
+
+    /**
+     * Check if a batch should be discarded by cleaned transaction state.
+     *
+     * @param batch The batch of records to check
+     * @param transactionMetadata The maintained transaction state about 
cleaning
+     *
+     * @return if the batch can be discarded
+     */
+    private boolean shouldDiscardBatch(RecordBatch batch,
+                                       CleanedTransactionMetadata 
transactionMetadata) {
+        if (batch.isControlBatch())
+            return transactionMetadata.onControlBatchRead(batch);
+        else
+            return transactionMetadata.onBatchRead(batch);
+    }
+
+    /**
+     * Check if a record should be retained.
+     *
+     * @param map The offset map(key=>offset) to use for cleaning segments
+     * @param retainDeletesForLegacyRecords Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+     * @param batch The batch of records that the record belongs to
+     * @param record The record to check
+     * @param stats The collector for cleaning statistics
+     * @param currentTime The current time that used to compare with the 
delete horizon time of the batch when judging a non-legacy record
+     *
+     * @return if the record  can be retained
+     */
+    private boolean shouldRetainRecord(OffsetMap map,
+                                       boolean retainDeletesForLegacyRecords,
+                                       RecordBatch batch,
+                                       Record record,
+                                       CleanerStats stats,
+                                       long currentTime) throws 
DigestException {
+        boolean pastLatestOffset = record.offset() > map.latestOffset();
+        if (pastLatestOffset) {
+            return true;
+        }
+
+        if (record.hasKey()) {
+            ByteBuffer key = record.key();
+            long foundOffset = map.get(key);
+            /* First,the message must have the latest offset for the key
+             * then there are two cases in which we can retain a message:
+             *   1) The message has value
+             *   2) The message doesn't have value but it can't be deleted now.
+             */
+            boolean latestOffsetForKey = record.offset() >= foundOffset;
+            boolean legacyRecord = batch.magic() < RecordBatch.MAGIC_VALUE_V2;
+
+            boolean shouldRetainDeletes;
+            if (!legacyRecord) {
+                shouldRetainDeletes = batch.deleteHorizonMs().isEmpty() || 
currentTime < batch.deleteHorizonMs().getAsLong();
+            } else {
+                shouldRetainDeletes = retainDeletesForLegacyRecords;
+            }
+
+            boolean isRetainedValue = record.hasValue() || shouldRetainDeletes;
+            return latestOffsetForKey && isRetainedValue;
+        } else {
+            stats.invalidMessage();
+            return false;
+        }
+    }
+
+    /**
+     * Double the I/O buffer capacity.
+     *
+     * @param maxLogMessageSize The maximum record size in bytes allowed
+     */
+    private void growBuffers(int maxLogMessageSize) {
+        int maxBufferSize = Math.max(maxLogMessageSize, maxIoBufferSize);
+        if (readBuffer.capacity() >= maxBufferSize || writeBuffer.capacity() 
>= maxBufferSize)
+            throw new IllegalStateException("This log contains a message 
larger than maximum allowable size of " + maxBufferSize + ".");
+        int newSize = Math.min(readBuffer.capacity() * 2, maxBufferSize);
+        logger.info("Growing cleaner I/O buffers from {} bytes to {} bytes.", 
readBuffer.capacity(), newSize);
+        readBuffer = ByteBuffer.allocate(newSize);
+        writeBuffer = ByteBuffer.allocate(newSize);
+    }
+
+    /**
+     * Restore the I/O buffer capacity to its original size.
+     */
+    private void restoreBuffers() {
+        if (readBuffer.capacity() > ioBufferSize)
+            readBuffer = ByteBuffer.allocate(ioBufferSize);
+        if (writeBuffer.capacity() > ioBufferSize)
+            writeBuffer = ByteBuffer.allocate(ioBufferSize);
+    }
+
+    /**
+     * Group the segments in a log into groups totaling less than a given 
size. the size is enforced separately for the log data and the index data.
+     * We collect a group of such segments together into a single destination 
segment.
+     * This prevents segment sizes from shrinking too much.
+     *
+     * @param segments The log segments to group
+     * @param maxSize the maximum size in bytes for the total of all log data 
in a group
+     * @param maxIndexSize the maximum size in bytes for the total of all 
index data in a group
+     * @param firstUncleanableOffset The upper(exclusive) offset to clean to
+     *
+     * @return A list of grouped segments
+     */
+    public List<List<LogSegment>> groupSegmentsBySize(List<LogSegment> 
segments, int maxSize, int maxIndexSize, long firstUncleanableOffset) throws 
IOException {
+        List<List<LogSegment>> grouped = new ArrayList<>();
+
+        while (!segments.isEmpty()) {
+            List<LogSegment> group = new ArrayList<>();
+            group.add(segments.get(0));
+
+            long logSize = segments.get(0).size();
+            long indexSize = segments.get(0).offsetIndex().sizeInBytes();
+            long timeIndexSize = segments.get(0).timeIndex().sizeInBytes();
+
+            segments = segments.subList(1, segments.size());
+
+            while (!segments.isEmpty() &&
+                    logSize + segments.get(0).size() <= maxSize &&
+                    indexSize + segments.get(0).offsetIndex().sizeInBytes() <= 
maxIndexSize &&
+                    timeIndexSize + segments.get(0).timeIndex().sizeInBytes() 
<= maxIndexSize &&
+                    //if first segment size is 0, we don't need to do the 
index offset range check.
+                    //this will avoid empty log left every 2^31 message.
+                    (segments.get(0).size() == 0 ||
+                            lastOffsetForFirstSegment(segments, 
firstUncleanableOffset) - group.get(group.size() - 1).baseOffset() <= 
Integer.MAX_VALUE)) {
+                group.add(0, segments.get(0));
+                logSize += segments.get(0).size();
+                indexSize += segments.get(0).offsetIndex().sizeInBytes();
+                timeIndexSize += segments.get(0).timeIndex().sizeInBytes();
+                segments = segments.subList(1, segments.size());
+            }
+
+            List<LogSegment> reversedGroup = IntStream.range(0, group.size())
+                    .map(i -> group.size() - 1 - i)
+                    .mapToObj(group::get)
+                    .toList();

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to