mimaison commented on code in PR #19387: URL: https://github.com/apache/kafka/pull/19387#discussion_r2031926411
########## build.gradle: ########## @@ -3737,6 +3737,7 @@ project(':connect:mirror') { testImplementation project(':core') testImplementation project(':test-common:test-common-runtime') testImplementation project(':server') + testImplementation project(':server-common') Review Comment: Do we need to keep the line below? I think using `testImplementation project(':server-common')` it already include the test output. ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -1407,7 +1408,7 @@ class LogManager(logDirs: Seq[File], } } finally { if (cleaner != null) { - cleaner.resumeCleaning(deletableLogs.map(_._1)) + cleaner.resumeCleaning(deletableLogs.map(_._1).toList.asJava) Review Comment: It's unfortunate we have to convert either the output of `pauseCleaningForNonCompactedPartitions()` or `currentLogs` from Java to Scala collections to build `deletableLogs` to then convert it to Java just here. Can we use a Java collection throughout? ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -1384,7 +1385,7 @@ class LogManager(logDirs: Seq[File], val deletableLogs = { if (cleaner != null) { // prevent cleaner from working on same partitions when changing cleanup policy - cleaner.pauseCleaningForNonCompactedPartitions() + cleaner.pauseCleaningForNonCompactedPartitions().asScala.map(entry => (entry.getKey, entry.getValue)) Review Comment: Do we actually need to return `List<Map.Entry<TopicPartition, UnifiedLog>>` in `cleaner.pauseCleaningForNonCompactedPartitions()`? The reason to do so would be to allow multiple entries for the same `TopicPartition`, but looking at the code in `LogCleanerManager` it does not look like it's possible. ########## core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala: ########## @@ -133,10 +133,10 @@ abstract class AbstractLogCleanerIntegrationTest { backoffMs, true) new LogCleaner(cleanerConfig, - logDirs = Array(logDir), - logs = logMap, - logDirFailureChannel = new LogDirFailureChannel(1), - time = time) + java.util.List.of(logDir), Review Comment: Usually we tend to import `java.util` when using this from Scala ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java: ########## @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.config.BrokerReconfigurable; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.storage.internals.utils.Throttler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.security.DigestException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.IntStream; + +/** + * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy. + * A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'. + * <p> + * Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a + * "dirty" section that has not yet been cleaned. The dirty section is further divided into the "cleanable" section followed by an "uncleanable" section. + * The uncleanable section is excluded from cleaning. The active log segment is always uncleanable. If there is a + * compaction lag time set, segments whose largest message timestamp is within the compaction lag time of the cleaning operation are also uncleanable. + * <p> + * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "compact" retention policy + * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. + * <p> + * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See {@link OffsetMap} for details of + * the implementation of the mapping. + * <p> + * Once the key=>last_offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a + * higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log). + * <p> + * To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when + * doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning. + * <p> + * Cleaned segments are swapped into the log as they become available. + * <p> + * One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted. + * <p> + * Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. + * The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic + * basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed). + * Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning. + * This time is tracked by setting the base timestamp of a record batch with delete markers when the batch is recopied in the first cleaning that encounters + * it. The relative timestamps of the records in the batch are also modified when recopied in this cleaning according to the new base timestamp of the batch. + * <p> + * Note that cleaning is more complicated with the idempotent/transactional producer capabilities. The following + * are the key points: + * <p> + * <ol> + * <li>In order to maintain sequence number continuity for active producers, we always retain the last batch + * from each producerId, even if all the records from the batch have been removed. The batch will be removed + * once the producer either writes a new batch or is expired due to inactivity.</li> + * <li>We do not clean beyond the last stable offset. This ensures that all records observed by the cleaner have + * been decided (i.e. committed or aborted). In particular, this allows us to use the transaction index to + * collect the aborted transactions ahead of time.</li> + * <li>Records from aborted transactions are removed by the cleaner immediately without regard to record keys.</li> + * <li>Transaction markers are retained until all record batches from the same transaction have been removed and + * a sufficient amount of time has passed to reasonably ensure that an active consumer wouldn't consume any + * data from the transaction prior to reaching the offset of the marker. This follows the same logic used for + * tombstone deletion.</li> + * </ol> + */ +public class LogCleaner implements BrokerReconfigurable { + private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); + + public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of( + CleanerConfig.LOG_CLEANER_THREADS_PROP, + CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, + CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, + CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, + ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, + CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, + CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP + ); + + // Visible for test + public static final String MAX_BUFFER_UTILIZATION_PERCENT_METRIC_NAME = "max-buffer-utilization-percent"; + public static final String MAX_CLEAN_TIME_METRIC_NAME = "max-clean-time-secs"; + public static final String MAX_COMPACTION_DELAY_METRICS_NAME = "max-compaction-delay-secs"; + + private static final String CLEANER_RECOPY_PERCENT_METRIC_NAME = "cleaner-recopy-percent"; + private static final String DEAD_THREAD_COUNT_METRIC_NAME = "DeadThreadCount"; + + // Visible for test + public static final Set<String> METRIC_NAMES = Set.of( + MAX_BUFFER_UTILIZATION_PERCENT_METRIC_NAME, + CLEANER_RECOPY_PERCENT_METRIC_NAME, + MAX_CLEAN_TIME_METRIC_NAME, + MAX_COMPACTION_DELAY_METRICS_NAME, + DEAD_THREAD_COUNT_METRIC_NAME + ); + + // For compatibility, metrics are defined to be under `kafka.log.LogCleaner` class + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log", "LogCleaner"); + + /** + * For managing the state of partitions being cleaned. + */ + private final LogCleanerManager cleanerManager; + + /** + * A throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate. + */ + private final Throttler throttler; + + private final ConcurrentMap<TopicPartition, UnifiedLog> logs; + private final LogDirFailureChannel logDirFailureChannel; + private final Time time; + Review Comment: nit: We can remove the extra blank lines. Same just above. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java: ########## @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.config.BrokerReconfigurable; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.util.ShutdownableThread; +import org.apache.kafka.storage.internals.utils.Throttler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.security.DigestException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.IntStream; + +/** + * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy. + * A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'. + * <p> + * Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a + * "dirty" section that has not yet been cleaned. The dirty section is further divided into the "cleanable" section followed by an "uncleanable" section. + * The uncleanable section is excluded from cleaning. The active log segment is always uncleanable. If there is a + * compaction lag time set, segments whose largest message timestamp is within the compaction lag time of the cleaning operation are also uncleanable. + * <p> + * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "compact" retention policy + * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. + * <p> + * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See {@link OffsetMap} for details of + * the implementation of the mapping. + * <p> + * Once the key=>last_offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a + * higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log). + * <p> + * To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when + * doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning. + * <p> + * Cleaned segments are swapped into the log as they become available. + * <p> + * One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted. + * <p> + * Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. + * The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic + * basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed). + * Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning. + * This time is tracked by setting the base timestamp of a record batch with delete markers when the batch is recopied in the first cleaning that encounters + * it. The relative timestamps of the records in the batch are also modified when recopied in this cleaning according to the new base timestamp of the batch. + * <p> + * Note that cleaning is more complicated with the idempotent/transactional producer capabilities. The following + * are the key points: + * <p> + * <ol> + * <li>In order to maintain sequence number continuity for active producers, we always retain the last batch + * from each producerId, even if all the records from the batch have been removed. The batch will be removed + * once the producer either writes a new batch or is expired due to inactivity.</li> + * <li>We do not clean beyond the last stable offset. This ensures that all records observed by the cleaner have + * been decided (i.e. committed or aborted). In particular, this allows us to use the transaction index to + * collect the aborted transactions ahead of time.</li> + * <li>Records from aborted transactions are removed by the cleaner immediately without regard to record keys.</li> + * <li>Transaction markers are retained until all record batches from the same transaction have been removed and + * a sufficient amount of time has passed to reasonably ensure that an active consumer wouldn't consume any + * data from the transaction prior to reaching the offset of the marker. This follows the same logic used for + * tombstone deletion.</li> + * </ol> + */ +public class LogCleaner implements BrokerReconfigurable { + private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); Review Comment: `kafka.log.LogCleaner` is one of the few classes that has a logger defined in our default `log4j2.yaml` file. So we should either use the `LoggerFactory.getLogger()` method that takes a `String` and keep `kafka.log.LogCleaner` or update our `log4j2.yaml` file. I'm unsure if changing the package can be considered a breaking change for this class. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; + +import java.io.IOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * This is a helper class to facilitate tracking transaction state while cleaning the log. It maintains a set + * of the ongoing aborted and committed transactions as the cleaner is working its way through the log. This + * class is responsible for deciding when transaction markers can be removed and is therefore also responsible + * for updating the cleaned transaction index accordingly. + */ +public class CleanedTransactionMetadata { + private final Set<Long> ongoingCommittedTxns = new HashSet<>(); + private final Map<Long, AbortedTransactionMetadata> ongoingAbortedTxns = new HashMap<>(); + + /** + * Minheap of aborted transactions sorted by the transaction first offset + */ + private final PriorityQueue<AbortedTxn> abortedTransactions = new PriorityQueue<>( + Comparator.comparingLong(AbortedTxn::firstOffset) + ); + + /** + * Output cleaned index to write retained aborted transactions + */ + Optional<TransactionIndex> cleanedIndex = Optional.empty(); + + /** + * Update the cleaned transaction state with the new found aborted transactions that has just been traversed. + * + * @param abortedTransactions The new found aborted transactions to add + */ + public void addAbortedTransactions(List<AbortedTxn> abortedTransactions) { + this.abortedTransactions.addAll(abortedTransactions); + } + + /** + * Update the cleaned transaction state with a control batch that has just been traversed by the cleaner. + * Return true if the control batch can be discarded. + * + * @param controlBatch The control batch that been traversed + * + * @return True if the control batch can be discarded + */ + public boolean onControlBatchRead(RecordBatch controlBatch) { + consumeAbortedTxnsUpTo(controlBatch.lastOffset()); + + Iterator<Record> controlRecordIterator = controlBatch.iterator(); + if (controlRecordIterator.hasNext()) { + Record controlRecord = controlRecordIterator.next(); + ControlRecordType controlType = ControlRecordType.parse(controlRecord.key()); + long producerId = controlBatch.producerId(); + + switch (controlType) { + case ABORT: + AbortedTransactionMetadata abortedTxnMetadata = ongoingAbortedTxns.remove(producerId); + + // Retain the marker until all batches from the transaction have been removed. + if (abortedTxnMetadata != null && abortedTxnMetadata.lastObservedBatchOffset.isPresent()) { + cleanedIndex.ifPresent(index -> { + try { + index.append(abortedTxnMetadata.abortedTxn); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return false; + } + + return true; + + case COMMIT: + // This marker is eligible for deletion if we didn't traverse any batches from the transaction + return !ongoingCommittedTxns.remove(producerId); + + default: + return false; + } + } else { + // An empty control batch was already cleaned, so it's safe to discard + return true; + } + } + + private void consumeAbortedTxnsUpTo(long offset) { + while (!abortedTransactions.isEmpty() && abortedTransactions.peek().firstOffset() <= offset) { + AbortedTxn abortedTxn = abortedTransactions.poll(); + if (abortedTxn != null) { + ongoingAbortedTxns.computeIfAbsent(abortedTxn.producerId(), id -> new AbortedTransactionMetadata(abortedTxn)); + } + } + } + + /** + * Update the transactional state for the incoming non-control batch. If the batch is part of + * an aborted transaction, return true to indicate that it is safe to discard. + * + * @param batch The batch to read when updating the transactional state + * + * @return Whether the batch is part of an aborted transaction or not + */ + public boolean onBatchRead(RecordBatch batch) { + consumeAbortedTxnsUpTo(batch.lastOffset()); + if (batch.isTransactional()) { + Optional<AbortedTransactionMetadata> metadata = Optional.ofNullable(ongoingAbortedTxns.get(batch.producerId())); + + if (metadata.isPresent()) { + metadata.get().lastObservedBatchOffset = Optional.of(batch.lastOffset()); + return true; + } else { + ongoingCommittedTxns.add(batch.producerId()); + return false; + } + } else { + return false; + } + } + + private static class AbortedTransactionMetadata { + Optional<Long> lastObservedBatchOffset = Optional.empty(); + final AbortedTxn abortedTxn; + + public AbortedTransactionMetadata(AbortedTxn abortedTxn) { + this.abortedTxn = abortedTxn; + } + + @Override + public String toString() { Review Comment: Is this method needed? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; + +import java.io.IOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * This is a helper class to facilitate tracking transaction state while cleaning the log. It maintains a set + * of the ongoing aborted and committed transactions as the cleaner is working its way through the log. This + * class is responsible for deciding when transaction markers can be removed and is therefore also responsible + * for updating the cleaned transaction index accordingly. + */ +public class CleanedTransactionMetadata { + private final Set<Long> ongoingCommittedTxns = new HashSet<>(); + private final Map<Long, AbortedTransactionMetadata> ongoingAbortedTxns = new HashMap<>(); + + /** + * Minheap of aborted transactions sorted by the transaction first offset + */ + private final PriorityQueue<AbortedTxn> abortedTransactions = new PriorityQueue<>( + Comparator.comparingLong(AbortedTxn::firstOffset) + ); + + /** + * Output cleaned index to write retained aborted transactions + */ + Optional<TransactionIndex> cleanedIndex = Optional.empty(); Review Comment: Can we put this field with the other ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org