mimaison commented on code in PR #19030: URL: https://github.com/apache/kafka/pull/19030#discussion_r1981217329
########## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ########## @@ -55,6 +113,2289 @@ public class UnifiedLog { public static final String STRAY_DIR_SUFFIX = LogFileUtils.STRAY_DIR_SUFFIX; public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET; + // For compatibility, metrics are defined to be under `Log` class + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log", "Log"); + + /* A lock that guards all modifications to the log */ + private final Object lock = new Object(); + private final Map<String, Map<String, String>> metricNames = new HashMap<>(); + + // localLog The LocalLog instance containing non-empty log segments recovered from disk + private final LocalLog localLog; + private final BrokerTopicStats brokerTopicStats; + private final ProducerStateManager producerStateManager; + private final boolean remoteStorageSystemEnable; + private final ScheduledFuture<?> producerExpireCheck; + private final int producerIdExpirationCheckIntervalMs; + private final String logIdent; + private final Logger logger; + private final LogValidator.MetricsRecorder validatorMetricsRecorder; + + /* The earliest offset which is part of an incomplete transaction. This is used to compute the + * last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset + * gets removed from the log (through record or segment deletion). In this case, the first unstable offset + * will point to the log start offset, which may actually be either part of a completed transaction or not + * part of a transaction at all. However, since we only use the LSO for the purpose of restricting the + * read_committed consumer to fetching decided data (i.e. committed, aborted, or non-transactional), this + * temporary abuse seems justifiable and saves us from scanning the log after deletion to find the first offsets + * of each ongoing transaction in order to compute a new first unstable offset. It is possible, however, + * that this could result in disagreement between replicas depending on when they began replicating the log. + * In the worst case, the LSO could be seen by a consumer to go backwards. + */ + private volatile Optional<LogOffsetMetadata> firstUnstableOffsetMetadata = Optional.empty(); + private volatile Optional<PartitionMetadataFile> partitionMetadataFile = Optional.empty(); + // This is the offset(inclusive) until which segments are copied to the remote storage. + private volatile long highestOffsetInRemoteStorage = -1L; + + /* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are + * not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark + * equals the log end offset (which may never happen for a partition under consistent load). This is needed to + * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark. + */ + private volatile LogOffsetMetadata highWatermarkMetadata; + private volatile long localLogStartOffset; + private volatile long logStartOffset; + private volatile LeaderEpochFileCache leaderEpochCache; + private volatile Optional<Uuid> topicId; + private volatile LogOffsetsListener logOffsetsListener; + + /** + * A log which presents a unified view of local and tiered log segments. + * + * <p>The log consists of tiered and local segments with the tiered portion of the log being optional. There could be an + * overlap between the tiered and local segments. The active segment is always guaranteed to be local. If tiered segments + * are present, they always appear at the beginning of the log, followed by an optional region of overlap, followed by the local + * segments including the active segment. + * + * <p>NOTE: this class handles state and behavior specific to tiered segments as well as any behavior combining both tiered + * and local segments. The state and behavior specific to local segments are handled by the encapsulated LocalLog instance. + * + * @param logStartOffset The earliest offset allowed to be exposed to kafka client. + * The logStartOffset can be updated by : + * - user's DeleteRecordsRequest + * - broker's log retention + * - broker's log truncation + * - broker's log recovery + * The logStartOffset is used to decide the following: + * - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted. + * It may trigger log rolling if the active segment is deleted. + * - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset, + * we make sure that logStartOffset <= log's highWatermark + * Other activities such as log cleaning are not affected by logStartOffset. + * @param localLog The LocalLog instance containing non-empty log segments recovered from disk + * @param brokerTopicStats Container for Broker Topic Yammer Metrics + * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired + * @param leaderEpochCache The LeaderEpochFileCache instance (if any) containing state associated + * with the provided logStartOffset and nextOffsetMetadata + * @param producerStateManager The ProducerStateManager instance containing state associated with the provided segments + * @param topicId optional Uuid to specify the topic ID for the topic if it exists. Should only be specified when + * first creating the log through Partition.makeLeader or Partition.makeFollower. When reloading a log, + * this field will be populated by reading the topic ID value from partition.metadata if it exists. + * @param remoteStorageSystemEnable flag to indicate whether the system level remote log storage is enabled or not. + * @param logOffsetsListener listener invoked when the high watermark is updated + */ + @SuppressWarnings({"this-escape"}) + public UnifiedLog(long logStartOffset, + LocalLog localLog, + BrokerTopicStats brokerTopicStats, + int producerIdExpirationCheckIntervalMs, + LeaderEpochFileCache leaderEpochCache, + ProducerStateManager producerStateManager, + Optional<Uuid> topicId, + boolean remoteStorageSystemEnable, + LogOffsetsListener logOffsetsListener) throws IOException { + this.logStartOffset = logStartOffset; + this.localLog = localLog; + this.brokerTopicStats = brokerTopicStats; + this.producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs; + this.leaderEpochCache = leaderEpochCache; + this.producerStateManager = producerStateManager; + this.topicId = topicId; + this.remoteStorageSystemEnable = remoteStorageSystemEnable; + this.logOffsetsListener = logOffsetsListener; + + this.logIdent = "[UnifiedLog partition=" + localLog.topicPartition() + ", dir=" + localLog.parentDir() + "] "; + this.logger = new LogContext(logIdent).logger(UnifiedLog.class); + this.highWatermarkMetadata = new LogOffsetMetadata(logStartOffset); + this.localLogStartOffset = logStartOffset; + this.producerExpireCheck = localLog.scheduler().schedule("PeriodicProducerExpirationCheck", () -> removeExpiredProducers(localLog.time().milliseconds()), + producerIdExpirationCheckIntervalMs, producerIdExpirationCheckIntervalMs); + this.validatorMetricsRecorder = UnifiedLog.newValidatorMetricsRecorder(brokerTopicStats.allTopicsStats()); + + initializePartitionMetadata(); + updateLogStartOffset(logStartOffset); + updateLocalLogStartOffset(Math.max(logStartOffset, localLog.segments().firstSegmentBaseOffset().orElse(0L))); + if (!remoteLogEnabled()) + this.logStartOffset = localLogStartOffset; + maybeIncrementFirstUnstableOffset(); + initializeTopicId(); + + logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset); + newMetrics(); + } + + /** + * Create a new UnifiedLog instance + * @param dir dir The directory in which log segments are created. + * @param config The log configuration settings + * @param logStartOffset The checkpoint of the log start offset + * @param recoveryPoint The checkpoint of the offset at which to begin the recovery + * @param scheduler The thread pool scheduler used for background actions + * @param brokerTopicStats Container for Broker Topic Yammer Metrics + * @param time The time instance used for checking the clock + * @param maxTransactionTimeoutMs The timeout in milliseconds for transactions + * @param producerStateManagerConfig The configuration for creating the ProducerStateManager instance + * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired + * @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle Log dir failure + * @param lastShutdownClean Boolean flag to indicate whether the associated log previously had a clean shutdown + * @param topicId optional Uuid to specify the topic ID for the topic if it exists + * @throws IOException if an I/O error occurs + */ + public static UnifiedLog create(File dir, + LogConfig config, + long logStartOffset, + long recoveryPoint, + Scheduler scheduler, + BrokerTopicStats brokerTopicStats, + Time time, + int maxTransactionTimeoutMs, + ProducerStateManagerConfig producerStateManagerConfig, + int producerIdExpirationCheckIntervalMs, + LogDirFailureChannel logDirFailureChannel, + boolean lastShutdownClean, + Optional<Uuid> topicId) throws IOException { + return create(dir, + config, + logStartOffset, + recoveryPoint, + scheduler, + brokerTopicStats, + time, + maxTransactionTimeoutMs, + producerStateManagerConfig, + producerIdExpirationCheckIntervalMs, + logDirFailureChannel, + lastShutdownClean, + topicId, + new ConcurrentHashMap<>(), + false, + LogOffsetsListener.NO_OP_OFFSETS_LISTENER); + } + + /** + * Create a new UnifiedLog instance + * @param dir dir The directory in which log segments are created. + * @param config The log configuration settings + * @param logStartOffset The checkpoint of the log start offset + * @param recoveryPoint The checkpoint of the offset at which to begin the recovery + * @param scheduler The thread pool scheduler used for background actions + * @param brokerTopicStats Container for Broker Topic Yammer Metrics + * @param time The time instance used for checking the clock + * @param maxTransactionTimeoutMs The timeout in milliseconds for transactions + * @param producerStateManagerConfig The configuration for creating the ProducerStateManager instance + * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired + * @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle Log dir failure + * @param lastShutdownClean Boolean flag to indicate whether the associated log previously had a clean shutdown + * @param topicId Optional Uuid to specify the topic ID for the topic if it exists + * @param numRemainingSegments The remaining segments to be recovered in this log keyed by recovery thread name + * @param remoteStorageSystemEnable Boolean flag to indicate whether the system level remote log storage is enabled or not. + * @param logOffsetsListener listener invoked when the high watermark is updated + * @throws IOException if an I/O error occurs + */ + public static UnifiedLog create(File dir, + LogConfig config, + long logStartOffset, + long recoveryPoint, + Scheduler scheduler, + BrokerTopicStats brokerTopicStats, + Time time, + int maxTransactionTimeoutMs, + ProducerStateManagerConfig producerStateManagerConfig, + int producerIdExpirationCheckIntervalMs, + LogDirFailureChannel logDirFailureChannel, + boolean lastShutdownClean, + Optional<Uuid> topicId, + ConcurrentMap<String, Integer> numRemainingSegments, + boolean remoteStorageSystemEnable, + LogOffsetsListener logOffsetsListener) throws IOException { + // create the log directory if it doesn't exist + Files.createDirectories(dir.toPath()); + TopicPartition topicPartition = UnifiedLog.parseTopicPartitionName(dir); + LogSegments segments = new LogSegments(topicPartition); + // The created leaderEpochCache will be truncated by LogLoader if necessary + // so it is guaranteed that the epoch entries will be correct even when on-disk + // checkpoint was stale (due to async nature of LeaderEpochFileCache#truncateFromStart/End). + LeaderEpochFileCache leaderEpochCache = UnifiedLog.createLeaderEpochCache( + dir, + topicPartition, + logDirFailureChannel, + Optional.empty(), + scheduler); + ProducerStateManager producerStateManager = new ProducerStateManager( + topicPartition, + dir, + maxTransactionTimeoutMs, + producerStateManagerConfig, + time); + boolean isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic()); + LoadedLogOffsets offsets = new LogLoader( + dir, + topicPartition, + config, + scheduler, + time, + logDirFailureChannel, + lastShutdownClean, + segments, + logStartOffset, + recoveryPoint, + leaderEpochCache, + producerStateManager, + numRemainingSegments, + isRemoteLogEnabled + ).load(); + LocalLog localLog = new LocalLog( + dir, + config, + segments, + offsets.recoveryPoint, + offsets.nextOffsetMetadata, + scheduler, + time, + topicPartition, + logDirFailureChannel); + return new UnifiedLog(offsets.logStartOffset, + localLog, + brokerTopicStats, + producerIdExpirationCheckIntervalMs, + leaderEpochCache, + producerStateManager, + topicId, + remoteStorageSystemEnable, + logOffsetsListener); + } + + public long localLogStartOffset() { + return localLogStartOffset; + } + + public LeaderEpochFileCache leaderEpochCache() { + return leaderEpochCache; + } + + public long logStartOffset() { + return logStartOffset; + } + + long highestOffsetInRemoteStorage() { + return highestOffsetInRemoteStorage; + } + + public Optional<PartitionMetadataFile> partitionMetadataFile() { + return partitionMetadataFile; + } + + public Optional<Uuid> topicId() { + return topicId; + } + + public File dir() { + return localLog.dir(); + } + + public String parentDir() { + return localLog.parentDir(); + } + + public File parentDirFile() { + return localLog.parentDirFile(); + } + + public String name() { + return localLog.name(); + } + + public long recoveryPoint() { + return localLog.recoveryPoint(); + } + + public TopicPartition topicPartition() { + return localLog.topicPartition(); + } + + public LogDirFailureChannel logDirFailureChannel() { + return localLog.logDirFailureChannel(); + } + + public LogConfig config() { + return localLog.config(); + } + + public boolean remoteLogEnabled() { + return UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config(), topicPartition().topic()); + } + + public ScheduledFuture<?> producerExpireCheck() { + return producerExpireCheck; + } + + public int producerIdExpirationCheckIntervalMs() { + return producerIdExpirationCheckIntervalMs; + } + + public void updateLogStartOffsetFromRemoteTier(long remoteLogStartOffset) { + if (!remoteLogEnabled()) { + logger.error("Ignoring the call as the remote log storage is disabled"); + return; + } + maybeIncrementLogStartOffset(remoteLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion); + } + + public void updateLocalLogStartOffset(long offset) throws IOException { + localLogStartOffset = offset; + if (highWatermark() < offset) { + updateHighWatermark(offset); + } + if (recoveryPoint() < offset) { + localLog.updateRecoveryPoint(offset); + } + } + + public void setLogOffsetsListener(LogOffsetsListener listener) { + logOffsetsListener = listener; + } + + /** + * Initialize topic ID information for the log by maintaining the partition metadata file and setting the in-memory _topicId. + * Set _topicId based on a few scenarios: + * - Recover topic ID if present. Ensure we do not try to assign a provided topicId that is inconsistent + * with the ID on file. + * - If we were provided a topic ID when creating the log and one does not yet exist + * set _topicId and write to the partition metadata file. + */ + private void initializeTopicId() { + PartitionMetadataFile partMetadataFile = partitionMetadataFile.orElseThrow(() -> + new KafkaException("The partitionMetadataFile should have been initialized")); + + if (partMetadataFile.exists()) { + Uuid fileTopicId = partMetadataFile.read().topicId(); + if (topicId.isPresent() && !topicId.get().equals(fileTopicId)) { + throw new InconsistentTopicIdException("Tried to assign topic ID " + topicId + " to log for topic partition " + topicPartition() + "," + + "but log already contained topic ID " + fileTopicId); + } + topicId = Optional.of(fileTopicId); + } else { + topicId.ifPresent(partMetadataFile::record); + scheduler().scheduleOnce("flush-metadata-file", this::maybeFlushMetadataFile); + } + } + + public LogConfig updateConfig(LogConfig newConfig) { + LogConfig oldConfig = localLog.config(); + localLog.updateConfig(newConfig); + return oldConfig; + } + + public long highWatermark() { + return highWatermarkMetadata.messageOffset; + } + + public ProducerStateManager producerStateManager() { + return producerStateManager; + } + + private Time time() { + return localLog.time(); + } + + private Scheduler scheduler() { + return localLog.scheduler(); + } + + /** + * Update the high watermark to a new offset. The new high watermark will be lower-bounded by the log start offset + * and upper-bounded by the log end offset. + * + * <p>This is intended to be called by the leader when initializing the high watermark. + * + * @param hw the suggested new value for the high watermark + * @return the updated high watermark offset + */ + public long updateHighWatermark(long hw) throws IOException { + return updateHighWatermark(new LogOffsetMetadata(hw)); + } + + /** + * Update high watermark with offset metadata. The new high watermark will be lower-bounded by the log start offset + * and upper-bounded by the log end offset. + * + * @param highWatermarkMetadata the suggested high watermark with offset metadata + * @return the updated high watermark offset + */ + public long updateHighWatermark(LogOffsetMetadata highWatermarkMetadata) throws IOException { + LogOffsetMetadata endOffsetMetadata = localLog.logEndOffsetMetadata(); + LogOffsetMetadata newHighWatermarkMetadata = highWatermarkMetadata.messageOffset < logStartOffset + ? new LogOffsetMetadata(logStartOffset) + : highWatermarkMetadata.messageOffset >= endOffsetMetadata.messageOffset + ? endOffsetMetadata + : highWatermarkMetadata; + + updateHighWatermarkMetadata(newHighWatermarkMetadata); + return newHighWatermarkMetadata.messageOffset; + } + + /** + * Update the high watermark to a new value if and only if it is larger than the old value. It is + * an error to update to a value which is larger than the log end offset. + * + * <p>This method is intended to be used by the leader to update the high watermark after follower + * fetch offsets have been updated. + * + * @return the old high watermark, if updated by the new value + */ + public Optional<LogOffsetMetadata> maybeIncrementHighWatermark(LogOffsetMetadata newHighWatermark) throws IOException { + if (newHighWatermark.messageOffset > logEndOffset()) { + throw new IllegalArgumentException("High watermark " + newHighWatermark + " update exceeds current " + + "log end offset " + localLog.logEndOffsetMetadata()); + } + + synchronized (lock) { + LogOffsetMetadata oldHighWatermark = fetchHighWatermarkMetadata(); + // Ensure that the high watermark increases monotonically. We also update the high watermark when the new + // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment. + if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || + (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) { + updateHighWatermarkMetadata(newHighWatermark); + return Optional.of(oldHighWatermark); + } else { + return Optional.empty(); + } + } + } + + /** + * Update high watermark with a new value. The new high watermark will be lower-bounded by the log start offset + * and upper-bounded by the log end offset. + * + * <p>This method is intended to be used by the follower to update its high watermark after + * replication from the leader. + * + * @return the new high watermark if the high watermark changed, None otherwise. + */ + public Optional<Long> maybeUpdateHighWatermark(long hw) throws IOException { + synchronized (lock) { + LogOffsetMetadata oldHighWatermark = highWatermarkMetadata; + long newHighWatermark = updateHighWatermark(new LogOffsetMetadata(hw)); + return (newHighWatermark == oldHighWatermark.messageOffset) + ? Optional.empty() + : Optional.of(newHighWatermark); + } + } + + + /** + * Get the offset and metadata for the current high watermark. If offset metadata is not + * known, this will do a lookup in the index and cache the result. + */ + private LogOffsetMetadata fetchHighWatermarkMetadata() throws IOException { + localLog.checkIfMemoryMappedBufferClosed(); + LogOffsetMetadata offsetMetadata = highWatermarkMetadata; + if (offsetMetadata.messageOffsetOnly()) { + synchronized (lock) { + LogOffsetMetadata fullOffset = maybeConvertToOffsetMetadata(highWatermark()); + updateHighWatermarkMetadata(fullOffset); + return fullOffset; + } + } else { + return offsetMetadata; + } + } + + private void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) throws IOException { + if (newHighWatermark.messageOffset < 0) { + throw new IllegalArgumentException("High watermark offset should be non-negative"); + } + + synchronized (lock) { + if (newHighWatermark.messageOffset < highWatermarkMetadata.messageOffset) { + logger.warn("Non-monotonic update of high watermark from {} to {}", highWatermarkMetadata, newHighWatermark); + } + highWatermarkMetadata = newHighWatermark; + producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset); + logOffsetsListener.onHighWatermarkUpdated(newHighWatermark.messageOffset); + maybeIncrementFirstUnstableOffset(); + } + logger.trace("Setting high watermark {}", newHighWatermark); + } + + /** + * Get the first unstable offset. Unlike the last stable offset, which is always defined, + * the first unstable offset only exists if there are transactions in progress. + * + * @return the first unstable offset, if it exists + */ + public Optional<Long> firstUnstableOffset() { + return firstUnstableOffsetMetadata.map(uom -> uom.messageOffset); + } + + private LogOffsetMetadata fetchLastStableOffsetMetadata() throws IOException { + localLog.checkIfMemoryMappedBufferClosed(); + + // cache the current high watermark to avoid a concurrent update invalidating the range check + LogOffsetMetadata highWatermarkMetadata = fetchHighWatermarkMetadata(); + if (firstUnstableOffsetMetadata.isPresent() && firstUnstableOffsetMetadata.get().messageOffset < highWatermarkMetadata.messageOffset) { + LogOffsetMetadata lom = firstUnstableOffsetMetadata.get(); + if (lom.messageOffsetOnly()) { + synchronized (lock) { + LogOffsetMetadata fullOffset = maybeConvertToOffsetMetadata(lom.messageOffset); + if (firstUnstableOffsetMetadata.get().equals(lom)) Review Comment: Yes that condition should always be true, so we should be able to remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org