mimaison commented on code in PR #19030:
URL: https://github.com/apache/kafka/pull/19030#discussion_r1981604968

@@ -55,6 +113,2289 @@ public class UnifiedLog {
     public static final String STRAY_DIR_SUFFIX = 
     public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET;
+    // For compatibility, metrics are defined to be under `Log` class
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "Log");
+    /* A lock that guards all modifications to the log */
+    private final Object lock = new Object();
+    private final Map<String, Map<String, String>> metricNames = new 
+    // localLog The LocalLog instance containing non-empty log segments 
recovered from disk
+    private final LocalLog localLog;
+    private final BrokerTopicStats brokerTopicStats;
+    private final ProducerStateManager producerStateManager;
+    private final boolean remoteStorageSystemEnable;
+    private final ScheduledFuture<?> producerExpireCheck;
+    private final int producerIdExpirationCheckIntervalMs;
+    private final String logIdent;
+    private final Logger logger;
+    private final LogValidator.MetricsRecorder validatorMetricsRecorder;
+    /* The earliest offset which is part of an incomplete transaction. This is 
used to compute the
+     * last stable offset (LSO) in ReplicaManager. Note that it is possible 
that the "true" first unstable offset
+     * gets removed from the log (through record or segment deletion). In this 
case, the first unstable offset
+     * will point to the log start offset, which may actually be either part 
of a completed transaction or not
+     * part of a transaction at all. However, since we only use the LSO for 
the purpose of restricting the
+     * read_committed consumer to fetching decided data (i.e. committed, 
aborted, or non-transactional), this
+     * temporary abuse seems justifiable and saves us from scanning the log 
after deletion to find the first offsets
+     * of each ongoing transaction in order to compute a new first unstable 
offset. It is possible, however,
+     * that this could result in disagreement between replicas depending on 
when they began replicating the log.
+     * In the worst case, the LSO could be seen by a consumer to go backwards.
+     */
+    private volatile Optional<LogOffsetMetadata> firstUnstableOffsetMetadata = 
+    private volatile Optional<PartitionMetadataFile> partitionMetadataFile = 
+    // This is the offset(inclusive) until which segments are copied to the 
remote storage.
+    private volatile long highestOffsetInRemoteStorage = -1L;
+    /* Keep track of the current high watermark in order to ensure that 
segments containing offsets at or above it are
+     * not eligible for deletion. This means that the active segment is only 
eligible for deletion if the high watermark
+     * equals the log end offset (which may never happen for a partition under 
consistent load). This is needed to
+     * prevent the log start offset (which is exposed in fetch responses) from 
getting ahead of the high watermark.
+     */
+    private volatile LogOffsetMetadata highWatermarkMetadata;
+    private volatile long localLogStartOffset;
+    private volatile long logStartOffset;
+    private volatile LeaderEpochFileCache leaderEpochCache;
+    private volatile Optional<Uuid> topicId;
+    private volatile LogOffsetsListener logOffsetsListener;
+    /**
+     * A log which presents a unified view of local and tiered log segments.
+     *
+     * <p>The log consists of tiered and local segments with the tiered 
portion of the log being optional. There could be an
+     * overlap between the tiered and local segments. The active segment is 
always guaranteed to be local. If tiered segments
+     * are present, they always appear at the beginning of the log, followed 
by an optional region of overlap, followed by the local
+     * segments including the active segment.
+     *
+     * <p>NOTE: this class handles state and behavior specific to tiered 
segments as well as any behavior combining both tiered
+     * and local segments. The state and behavior specific to local segments 
are handled by the encapsulated LocalLog instance.
+     *
+     * @param logStartOffset The earliest offset allowed to be exposed to 
kafka client.
+     *                       The logStartOffset can be updated by :
+     *                       - user's DeleteRecordsRequest
+     *                       - broker's log retention
+     *                       - broker's log truncation
+     *                       - broker's log recovery
+     *                       The logStartOffset is used to decide the 
+     *                       - Log deletion. LogSegment whose nextOffset <= 
log's logStartOffset can be deleted.
+     *                         It may trigger log rolling if the active 
segment is deleted.
+     *                       - Earliest offset of the log in response to 
ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to 
earliest offset,
+     *                         we make sure that logStartOffset <= log's 
+     *                       Other activities such as log cleaning are not 
affected by logStartOffset.
+     * @param localLog The LocalLog instance containing non-empty log segments 
recovered from disk
+     * @param brokerTopicStats Container for Broker Topic Yammer Metrics
+     * @param producerIdExpirationCheckIntervalMs How often to check for 
producer ids which need to be expired
+     * @param leaderEpochCache The LeaderEpochFileCache instance (if any) 
containing state associated
+     *                         with the provided logStartOffset and 
+     * @param producerStateManager The ProducerStateManager instance 
containing state associated with the provided segments
+     * @param topicId optional Uuid to specify the topic ID for the topic if 
it exists. Should only be specified when
+     *                first creating the log through Partition.makeLeader or 
Partition.makeFollower. When reloading a log,
+     *                this field will be populated by reading the topic ID 
value from partition.metadata if it exists.
+     * @param remoteStorageSystemEnable flag to indicate whether the system 
level remote log storage is enabled or not.
+     * @param logOffsetsListener listener invoked when the high watermark is 
+     */
+    @SuppressWarnings({"this-escape"})
+    public UnifiedLog(long logStartOffset,
+                      LocalLog localLog,
+                      BrokerTopicStats brokerTopicStats,
+                      int producerIdExpirationCheckIntervalMs,
+                      LeaderEpochFileCache leaderEpochCache,
+                      ProducerStateManager producerStateManager,
+                      Optional<Uuid> topicId,
+                      boolean remoteStorageSystemEnable,
+                      LogOffsetsListener logOffsetsListener) throws 
IOException {
+        this.logStartOffset = logStartOffset;
+        this.localLog = localLog;
+        this.brokerTopicStats = brokerTopicStats;
+        this.producerIdExpirationCheckIntervalMs = 
+        this.leaderEpochCache = leaderEpochCache;
+        this.producerStateManager = producerStateManager;
+        this.topicId = topicId;
+        this.remoteStorageSystemEnable = remoteStorageSystemEnable;
+        this.logOffsetsListener = logOffsetsListener;
+        this.logIdent = "[UnifiedLog partition=" + localLog.topicPartition() + 
", dir=" + localLog.parentDir() + "] ";
+        this.logger = new LogContext(logIdent).logger(UnifiedLog.class);
+        this.highWatermarkMetadata = new LogOffsetMetadata(logStartOffset);
+        this.localLogStartOffset = logStartOffset;
+        this.producerExpireCheck = 
localLog.scheduler().schedule("PeriodicProducerExpirationCheck", () -> 
+                producerIdExpirationCheckIntervalMs, 
+        this.validatorMetricsRecorder = 
+        initializePartitionMetadata();
+        updateLogStartOffset(logStartOffset);
+        updateLocalLogStartOffset(Math.max(logStartOffset, 
+        if (!remoteLogEnabled())
+            this.logStartOffset = localLogStartOffset;
+        maybeIncrementFirstUnstableOffset();
+        initializeTopicId();
+        newMetrics();
+    }
+    /**
+     * Create a new UnifiedLog instance
+     * @param dir dir The directory in which log segments are created.
+     * @param config The log configuration settings
+     * @param logStartOffset The checkpoint of the log start offset
+     * @param recoveryPoint The checkpoint of the offset at which to begin the 
+     * @param scheduler The thread pool scheduler used for background actions
+     * @param brokerTopicStats Container for Broker Topic Yammer Metrics
+     * @param time The time instance used for checking the clock
+     * @param maxTransactionTimeoutMs The timeout in milliseconds for 
+     * @param producerStateManagerConfig The configuration for creating the 
ProducerStateManager instance
+     * @param producerIdExpirationCheckIntervalMs How often to check for 
producer ids which need to be expired
+     * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+     * @param lastShutdownClean Boolean flag to indicate whether the 
associated log previously had a clean shutdown
+     * @param topicId optional Uuid to specify the topic ID for the topic if 
it exists
+     * @throws IOException if an I/O error occurs
+     */
+    public static UnifiedLog create(File dir,
+                                    LogConfig config,
+                                    long logStartOffset,
+                                    long recoveryPoint,
+                                    Scheduler scheduler,
+                                    BrokerTopicStats brokerTopicStats,
+                                    Time time,
+                                    int maxTransactionTimeoutMs,
+                                    ProducerStateManagerConfig 
+                                    int producerIdExpirationCheckIntervalMs,
+                                    LogDirFailureChannel logDirFailureChannel,
+                                    boolean lastShutdownClean,
+                                    Optional<Uuid> topicId) throws IOException 
+        return create(dir,
+                config,
+                logStartOffset,
+                recoveryPoint,
+                scheduler,
+                brokerTopicStats,
+                time,
+                maxTransactionTimeoutMs,
+                producerStateManagerConfig,
+                producerIdExpirationCheckIntervalMs,
+                logDirFailureChannel,
+                lastShutdownClean,
+                topicId,
+                new ConcurrentHashMap<>(),
+                false,
+                LogOffsetsListener.NO_OP_OFFSETS_LISTENER);
+    }
+    /**
+     * Create a new UnifiedLog instance
+     * @param dir dir The directory in which log segments are created.
+     * @param config The log configuration settings
+     * @param logStartOffset The checkpoint of the log start offset
+     * @param recoveryPoint The checkpoint of the offset at which to begin the 
+     * @param scheduler The thread pool scheduler used for background actions
+     * @param brokerTopicStats Container for Broker Topic Yammer Metrics
+     * @param time The time instance used for checking the clock
+     * @param maxTransactionTimeoutMs The timeout in milliseconds for 
+     * @param producerStateManagerConfig The configuration for creating the 
ProducerStateManager instance
+     * @param producerIdExpirationCheckIntervalMs How often to check for 
producer ids which need to be expired
+     * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+     * @param lastShutdownClean Boolean flag to indicate whether the 
associated log previously had a clean shutdown
+     * @param topicId Optional Uuid to specify the topic ID for the topic if 
it exists
+     * @param numRemainingSegments The remaining segments to be recovered in 
this log keyed by recovery thread name
+     * @param remoteStorageSystemEnable Boolean flag to indicate whether the 
system level remote log storage is enabled or not.
+     * @param logOffsetsListener listener invoked when the high watermark is 
+     * @throws IOException if an I/O error occurs
+     */
+    public static UnifiedLog create(File dir,
+                                    LogConfig config,
+                                    long logStartOffset,
+                                    long recoveryPoint,
+                                    Scheduler scheduler,
+                                    BrokerTopicStats brokerTopicStats,
+                                    Time time,
+                                    int maxTransactionTimeoutMs,
+                                    ProducerStateManagerConfig 
+                                    int producerIdExpirationCheckIntervalMs,
+                                    LogDirFailureChannel logDirFailureChannel,
+                                    boolean lastShutdownClean,
+                                    Optional<Uuid> topicId,
+                                    ConcurrentMap<String, Integer> 
+                                    boolean remoteStorageSystemEnable,
+                                    LogOffsetsListener logOffsetsListener) 
throws IOException {
+        // create the log directory if it doesn't exist
+        Files.createDirectories(dir.toPath());
+        TopicPartition topicPartition = 
+        LogSegments segments = new LogSegments(topicPartition);
+        // The created leaderEpochCache will be truncated by LogLoader if 
+        // so it is guaranteed that the epoch entries will be correct even 
when on-disk
+        // checkpoint was stale (due to async nature of 
+        LeaderEpochFileCache leaderEpochCache = 
+                dir,
+                topicPartition,
+                logDirFailureChannel,
+                Optional.empty(),
+                scheduler);
+        ProducerStateManager producerStateManager = new ProducerStateManager(
+                topicPartition,
+                dir,
+                maxTransactionTimeoutMs,
+                producerStateManagerConfig,
+                time);
+        boolean isRemoteLogEnabled = 
UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, 
+        LoadedLogOffsets offsets = new LogLoader(
+                dir,
+                topicPartition,
+                config,
+                scheduler,
+                time,
+                logDirFailureChannel,
+                lastShutdownClean,
+                segments,
+                logStartOffset,
+                recoveryPoint,
+                leaderEpochCache,
+                producerStateManager,
+                numRemainingSegments,
+                isRemoteLogEnabled
+                ).load();
+        LocalLog localLog = new LocalLog(
+                dir,
+                config,
+                segments,
+                offsets.recoveryPoint,
+                offsets.nextOffsetMetadata,
+                scheduler,
+                time,
+                topicPartition,
+                logDirFailureChannel);
+        return new UnifiedLog(offsets.logStartOffset,
+                localLog,
+                brokerTopicStats,
+                producerIdExpirationCheckIntervalMs,
+                leaderEpochCache,
+                producerStateManager,
+                topicId,
+                remoteStorageSystemEnable,
+                logOffsetsListener);
+    }
+    public long localLogStartOffset() {
+        return localLogStartOffset;
+    }
+    public LeaderEpochFileCache leaderEpochCache() {
+        return leaderEpochCache;
+    }
+    public long logStartOffset() {
+        return logStartOffset;
+    }
+    long highestOffsetInRemoteStorage() {
+        return highestOffsetInRemoteStorage;
+    }
+    public Optional<PartitionMetadataFile> partitionMetadataFile() {
+        return partitionMetadataFile;
+    }
+    public Optional<Uuid> topicId() {
+        return topicId;
+    }
+    public File dir() {
+        return localLog.dir();
+    }
+    public String parentDir() {
+        return localLog.parentDir();
+    }
+    public File parentDirFile() {
+        return localLog.parentDirFile();
+    }
+    public String name() {
+        return localLog.name();
+    }
+    public long recoveryPoint() {
+        return localLog.recoveryPoint();
+    }
+    public TopicPartition topicPartition() {
+        return localLog.topicPartition();
+    }
+    public LogDirFailureChannel logDirFailureChannel() {
+        return localLog.logDirFailureChannel();
+    }
+    public LogConfig config() {
+        return localLog.config();
+    }
+    public boolean remoteLogEnabled() {
+        return UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, 
config(), topicPartition().topic());
+    }
+    public ScheduledFuture<?> producerExpireCheck() {
+        return producerExpireCheck;
+    }
+    public int producerIdExpirationCheckIntervalMs() {
+        return producerIdExpirationCheckIntervalMs;
+    }
+    public void updateLogStartOffsetFromRemoteTier(long remoteLogStartOffset) {
+        if (!remoteLogEnabled()) {
+            logger.error("Ignoring the call as the remote log storage is 
+            return;
+        }
+        maybeIncrementLogStartOffset(remoteLogStartOffset, 
+    }
+    public void updateLocalLogStartOffset(long offset) throws IOException {

Review Comment:
   Added comments on a few methods only used by/visible for tests

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

Reply via email to