mimaison commented on code in PR #19030: URL: https://github.com/apache/kafka/pull/19030#discussion_r1981225722
########## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ########## @@ -55,6 +113,2289 @@ public class UnifiedLog { public static final String STRAY_DIR_SUFFIX = LogFileUtils.STRAY_DIR_SUFFIX; public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET; + // For compatibility, metrics are defined to be under `Log` class + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log", "Log"); + + /* A lock that guards all modifications to the log */ + private final Object lock = new Object(); + private final Map<String, Map<String, String>> metricNames = new HashMap<>(); + + // localLog The LocalLog instance containing non-empty log segments recovered from disk + private final LocalLog localLog; + private final BrokerTopicStats brokerTopicStats; + private final ProducerStateManager producerStateManager; + private final boolean remoteStorageSystemEnable; + private final ScheduledFuture<?> producerExpireCheck; + private final int producerIdExpirationCheckIntervalMs; + private final String logIdent; + private final Logger logger; + private final LogValidator.MetricsRecorder validatorMetricsRecorder; + + /* The earliest offset which is part of an incomplete transaction. This is used to compute the + * last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset + * gets removed from the log (through record or segment deletion). In this case, the first unstable offset + * will point to the log start offset, which may actually be either part of a completed transaction or not + * part of a transaction at all. However, since we only use the LSO for the purpose of restricting the + * read_committed consumer to fetching decided data (i.e. committed, aborted, or non-transactional), this + * temporary abuse seems justifiable and saves us from scanning the log after deletion to find the first offsets + * of each ongoing transaction in order to compute a new first unstable offset. It is possible, however, + * that this could result in disagreement between replicas depending on when they began replicating the log. + * In the worst case, the LSO could be seen by a consumer to go backwards. + */ + private volatile Optional<LogOffsetMetadata> firstUnstableOffsetMetadata = Optional.empty(); + private volatile Optional<PartitionMetadataFile> partitionMetadataFile = Optional.empty(); + // This is the offset(inclusive) until which segments are copied to the remote storage. + private volatile long highestOffsetInRemoteStorage = -1L; + + /* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are + * not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark + * equals the log end offset (which may never happen for a partition under consistent load). This is needed to + * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark. + */ + private volatile LogOffsetMetadata highWatermarkMetadata; + private volatile long localLogStartOffset; + private volatile long logStartOffset; + private volatile LeaderEpochFileCache leaderEpochCache; + private volatile Optional<Uuid> topicId; + private volatile LogOffsetsListener logOffsetsListener; + + /** + * A log which presents a unified view of local and tiered log segments. + * + * <p>The log consists of tiered and local segments with the tiered portion of the log being optional. There could be an + * overlap between the tiered and local segments. The active segment is always guaranteed to be local. If tiered segments + * are present, they always appear at the beginning of the log, followed by an optional region of overlap, followed by the local + * segments including the active segment. + * + * <p>NOTE: this class handles state and behavior specific to tiered segments as well as any behavior combining both tiered + * and local segments. The state and behavior specific to local segments are handled by the encapsulated LocalLog instance. + * + * @param logStartOffset The earliest offset allowed to be exposed to kafka client. + * The logStartOffset can be updated by : + * - user's DeleteRecordsRequest + * - broker's log retention + * - broker's log truncation + * - broker's log recovery + * The logStartOffset is used to decide the following: + * - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted. + * It may trigger log rolling if the active segment is deleted. + * - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset, + * we make sure that logStartOffset <= log's highWatermark + * Other activities such as log cleaning are not affected by logStartOffset. + * @param localLog The LocalLog instance containing non-empty log segments recovered from disk + * @param brokerTopicStats Container for Broker Topic Yammer Metrics + * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired + * @param leaderEpochCache The LeaderEpochFileCache instance (if any) containing state associated + * with the provided logStartOffset and nextOffsetMetadata + * @param producerStateManager The ProducerStateManager instance containing state associated with the provided segments + * @param topicId optional Uuid to specify the topic ID for the topic if it exists. Should only be specified when + * first creating the log through Partition.makeLeader or Partition.makeFollower. When reloading a log, + * this field will be populated by reading the topic ID value from partition.metadata if it exists. + * @param remoteStorageSystemEnable flag to indicate whether the system level remote log storage is enabled or not. + * @param logOffsetsListener listener invoked when the high watermark is updated + */ + @SuppressWarnings({"this-escape"}) + public UnifiedLog(long logStartOffset, + LocalLog localLog, + BrokerTopicStats brokerTopicStats, + int producerIdExpirationCheckIntervalMs, + LeaderEpochFileCache leaderEpochCache, + ProducerStateManager producerStateManager, + Optional<Uuid> topicId, + boolean remoteStorageSystemEnable, + LogOffsetsListener logOffsetsListener) throws IOException { + this.logStartOffset = logStartOffset; + this.localLog = localLog; + this.brokerTopicStats = brokerTopicStats; + this.producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs; + this.leaderEpochCache = leaderEpochCache; + this.producerStateManager = producerStateManager; + this.topicId = topicId; + this.remoteStorageSystemEnable = remoteStorageSystemEnable; + this.logOffsetsListener = logOffsetsListener; + + this.logIdent = "[UnifiedLog partition=" + localLog.topicPartition() + ", dir=" + localLog.parentDir() + "] "; + this.logger = new LogContext(logIdent).logger(UnifiedLog.class); + this.highWatermarkMetadata = new LogOffsetMetadata(logStartOffset); + this.localLogStartOffset = logStartOffset; + this.producerExpireCheck = localLog.scheduler().schedule("PeriodicProducerExpirationCheck", () -> removeExpiredProducers(localLog.time().milliseconds()), Review Comment: I thought I had issues with Spotbugs when calling instance methods in the constructor but I'm not seeing failures when I'm making these changes now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org