github-actions[bot] commented on code in PR #61199:
URL: https://github.com/apache/doris/pull/61199#discussion_r2944748502


##########
gensrc/thrift/AgentService.thrift:
##########
@@ -459,6 +459,7 @@ struct TPartitionVersionInfo {
     1: required Types.TPartitionId partition_id
     2: required Types.TVersion version
     3: required Types.TVersionHash version_hash // Deprecated
+    4: required i64 commit_tso
 }
 

Review Comment:
   **[Critical — Rolling Upgrade Breakage]**
   
   `commit_tso` is declared as `required`, but this field is newly added. 
During a rolling upgrade, an **old FE** (without this field) will serialize 
`TPartitionVersionInfo` with only 3 fields. The new BE will deserialize the 
missing `required i64` field as `0` (Thrift's default for int64), **not** as 
`-1` (the application-level sentinel meaning "no TSO").
   
   This causes **silent semantic corruption**: rowsets will be stamped with 
`commit_tso=0` instead of the intended `-1` default (as defined in 
`olap_file.proto` with `[default = -1]`). Any downstream logic checking 
`commit_tso > 0` vs `commit_tso == -1` would behave incorrectly.
   
   **Fix:** Change to `4: optional i64 commit_tso` and have the BE check 
`__isset.commit_tso` to fall back to `-1` when absent. Alternatively, set a 
default: `4: optional i64 commit_tso = -1`.



##########
regression-test/data/query_p0/system/test_query_sys_rowsets.out.rej:
##########
@@ -0,0 +1,9 @@
+diff a/regression-test/data/query_p0/system/test_query_sys_rowsets.out 
b/regression-test/data/query_p0/system/test_query_sys_rowsets.out       
(rejected hunks)
+@@ -13,6 +13,7 @@ DATA_DISK_SIZE        bigint  Yes     false   \N

Review Comment:
   **[High — Merge Conflict Artifact]**
   
   This `.rej` file is a Git merge/rebase reject artifact and **must not be 
committed**. The corresponding `.out` file (`test_query_sys_rowsets.out`) also 
needs to be updated with the `COMMIT_TSO` column.
   
   Same applies to `test_query_sys_scan_rowsets.out.rej`.



##########
fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java:
##########
@@ -102,13 +102,15 @@ public final class FeMetaVersion {
     public static final int VERSION_139 = 139;
 
     public static final int VERSION_140 = 140;
+    // For tso
+    public static final int VERSION_141 = 141;
 
     // note: when increment meta version, should assign the latest version to 
VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_140;
+    public static final int VERSION_CURRENT = VERSION_141;
 
 
     // all logs meta version should >= the minimum version, so that we could 
remove many if clause, for example
     // if (FE_METAVERSION < VERSION_94) ...
     // these clause will be useless and we could remove them
-    public static final int MINIMUM_VERSION_REQUIRED = VERSION_140;
+    public static final int MINIMUM_VERSION_REQUIRED = VERSION_141;
 }

Review Comment:
   **[Critical — Prevents Rollback for Experimental Feature]**
   
   Bumping `MINIMUM_VERSION_REQUIRED` from 140 to 141 means:
   1. Once any FE with this code writes an image (which uses 
`VERSION_CURRENT=141`), **no older FE** can read it.
   2. Conversely, this FE **refuses to load** any image with version < 141.
   
   This creates a **hard one-way migration gate** for a feature that is:
   - Explicitly experimental (`varType = VariableAnnotation.EXPERIMENTAL`)
   - Disabled by default (`enable_feature_tso = false`)
   - Guarded by three separate config flags
   
   No code anywhere checks `VERSION_141` conditionally — there are zero `if 
(version >= VERSION_141)` guards in the codebase. The TSO data is persisted via:
   - Gson `@SerializedName` fields (backward-compatible — unknown fields are 
ignored)
   - An image module that writes **nothing** when disabled
   - An EditLog opcode only emitted when `enable_tso_persist_journal=true`
   
   **Fix:** Do NOT bump `MINIMUM_VERSION_REQUIRED`. Only bump `VERSION_CURRENT` 
if you need a marker. The module system already handles unknown/empty modules 
gracefully (`MetaReader` skips empty modules and has 
`ignore_unknown_metadata_module` for unknown ones).



##########
fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java:
##########
@@ -0,0 +1,450 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tso;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.journal.local.LocalJournal;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.persist.EditLog;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class TSOService extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(TSOService.class);
+
+    // Global timestamp with physical time and logical counter
+    private final TSOTimestamp globalTimestamp = new TSOTimestamp();
+    // Lock for thread-safe access to global timestamp
+    private final ReentrantLock lock = new ReentrantLock();
+    // Guard value for time window updates (in milliseconds)
+    private static final long UPDATE_TIME_WINDOW_GUARD = 1;
+
+    /**
+     * Constructor initializes the TSO service with update interval
+     */
+    public TSOService() {
+        super("TSO-service", Config.tso_service_update_interval_ms);
+    }
+
+    /**
+     * Start the TSO service and calibrate timestamp if needed
+     */
+    @Override
+    public synchronized void start() {
+        super.start();

Review Comment:
   **[Medium — Unnecessary Work When TSO Disabled]**
   
   The `start()` method unconditionally calls `super.start()` (spawning the 
MasterDaemon thread) and runs the calibration loop, regardless of 
`Config.enable_feature_tso`. While `runAfterCatalogReady()` checks the flag, 
the thread still runs every `tso_service_update_interval_ms` (50ms by default) 
doing nothing useful when TSO is disabled.
   
   Consider adding a guard at the top of `start()`:
   ```java
   if (!Config.enable_feature_tso) {
       return;
   }
   ```
   Note: since the config is `mutable`, you may also want to support dynamic 
enable — but the current implementation doesn't handle that correctly anyway 
(the daemon would already be running but would have skipped calibration).



##########
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java:
##########
@@ -3050,4 +3074,32 @@ private void cleanSubTransactions(long transactionId) {
             }
         }
     }
+
+    private long getCommitTSO(TransactionState transactionState, Database db, 
Set<Long> tableIds) {
+        long tso = -1L;
+        if (!Config.enable_feature_tso) {
+            return tso;
+        }
+        if (tableIds == null || tableIds.isEmpty()) {
+            return tso;
+        }
+        boolean anyEnableTso = false;
+        for (long tableId : tableIds) {
+            Table table = db.getTableNullable(tableId);
+            if (table instanceof OlapTable && ((OlapTable) table).enableTso()) 
{
+                anyEnableTso = true;
+                break;
+            }
+        }
+        if (!anyEnableTso) {
+            return tso;
+        }
+        long fetched = Env.getCurrentEnv().getTSOService().getTSO();
+        if (fetched <= 0) {
+            LOG.warn("failed to get TSO for txn {}, fallback to -1",
+                    transactionState.getTransactionId());
+            return tso;

Review Comment:
   **[High — Silent Loss of TSO Ordering Guarantee]**
   
   When `getTSO()` returns `-1` (exhausted retries, logical counter overflow, 
or not calibrated), `getCommitTSO()` silently falls back to `-1` and the 
transaction **still commits successfully**. The user has explicitly opted into 
TSO ordering by setting `enable_tso=true` on the table, but this commit has no 
valid TSO.
   
   The `-1` value is indistinguishable from "TSO was never assigned" (protobuf 
default), breaking any downstream ordering semantics that depend on valid TSO 
values.
   
   Consider:
   1. Throwing an exception / returning an error status to fail the commit when 
TSO is required but unavailable, OR
   2. At minimum, adding a prominent WARNING log with the transaction ID and 
table ID so the issue is observable, AND documenting that TSO is best-effort 
(not guaranteed).



##########
regression-test/suites/tso_p0/test_tso_api.groovy:
##########
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.Http
+import groovy.json.JsonSlurper
+
+suite("test_tso_api") {
+    def ret = sql "SHOW FRONTEND CONFIG like 
'%experimental_enable_feature_tso%';"
+    logger.info("${ret}")
+    try {
+        sql "ADMIN SET FRONTEND CONFIG ('experimental_enable_feature_tso' = 
'true')"
+        sleep(1000)
+        def currentTime = System.currentTimeMillis()
+
+        // Test TSO API endpoint
+        def url = String.format("http://%s/api/tso";, 
context.config.feHttpAddress)
+
+        // Test 1: Basic TSO API access
+        def result = Http.GET(url, true)
+        logger.info("TSO API response: ${result}")
+
+        assertTrue(result.code == 0)
+        assertEquals(result.msg, "success")
+
+        // Check that all expected fields are present in the response
+        def data = result.data
+        assertTrue(data.containsKey("window_end_physical_time"))
+        assertTrue(data.containsKey("current_tso"))
+        assertTrue(data.containsKey("current_tso_physical_time"))
+        assertTrue(data.containsKey("current_tso_logical_counter"))
+
+        // Validate that TSO values are reasonable
+        assertTrue(data.window_end_physical_time > 0)
+        assertTrue(data.current_tso > 0)
+        assertTrue(data.current_tso_physical_time > 0)
+        assertTrue(data.current_tso_logical_counter >= 0)
+
+        // Test 2: Multiple TSO API calls should return consistent increasing 
values
+        def result1 = Http.GET(url, true)
+        Thread.sleep(10) // Small delay to ensure time progression
+        def result2 = Http.GET(url, true)
+
+        assertTrue(result1.code == 0)
+        assertTrue(result2.code == 0)
+
+        def tso1 = result1.data.current_tso
+        def tso2 = result2.data.current_tso
+
+        // TSO should be monotonically increasing
+        assertTrue(tso2 >= tso1)
+
+        // Test 3: Validate TSO timestamp structure
+        def physicalTime1 = result1.data.current_tso_physical_time
+        def logicalCounter1 = result1.data.current_tso_logical_counter
+        def physicalTime2 = result2.data.current_tso_physical_time
+        def logicalCounter2 = result2.data.current_tso_logical_counter
+
+        // Physical time should be consistent with TSO
+        assertTrue(physicalTime1 <= tso1)
+        assertTrue(physicalTime2 <= tso2)
+
+        // Test 4: Validate window end time is in the future
+        def windowEndTime = result1.data.window_end_physical_time
+        assertTrue(windowEndTime >= currentTime)
+
+        // Test 5: Test unauthorized access (without credentials)
+        try {
+            def unauthorizedResult = Http.GET(url, false) // false means no 
auth
+            // Depending on server configuration, this might return 401 or 
still work
+            logger.info("Unauthorized access result: ${unauthorizedResult}")
+        } catch (Exception e) {
+            logger.info("Expected unauthorized access exception: 
${e.getMessage()}")
+        }
+
+        // Test 6: Validate TSO timestamp composition
+        def tsoValue = result1.data.current_tso
+        def physicalTime = result1.data.current_tso_physical_time
+        def logicalCounter = result1.data.current_tso_logical_counter
+
+        // Validate that the TSO is composed correctly from physical time and 
logical counter
+        // TSO format: 46 bits physical time + 6 bits reserved + 12 bits 
logical counter
+        def expectedTSO = (physicalTime << 18) | (logicalCounter & 0xFFF)
+        // Note: We're not checking exact equality because of the reserved 
bits in the middle
+
+        // At least verify that the physical time part matches
+        def extractedPhysicalTime = (tsoValue >> 18) & 0x3FFFFFFFFFFL // 46 
bits mask
+        assertEquals(physicalTime, extractedPhysicalTime)
+
+        // And that the logical counter part matches (lowest 12 bits)
+        def extractedLogicalCounter = tsoValue & 0xFFFL // 12 bits mask
+        assertEquals(logicalCounter, extractedLogicalCounter)
+    } finally {
+        sql "ADMIN SET FRONTEND CONFIG ('experimental_enable_feature_tso' = 
'${ret[0][1]}')"

Review Comment:
   **[Medium — Wrong Bit Mask in Test Validation]**
   
   This test extracts the logical counter using a **12-bit mask** (`0xFFF`), 
but `TSOTimestamp` uses an **18-bit logical counter** (see `LOGICAL_BITS = 18` 
in `TSOTimestamp.java`). The correct mask should be `0x3FFFF` (18 bits).
   
   Similarly, the comment on line 100 says "46 bits physical time + 6 bits 
reserved + 12 bits logical counter" — there are no reserved bits. The format is 
46 bits physical + 18 bits logical.
   
   The test may still pass by coincidence if the logical counter value fits in 
12 bits, but it would fail for counter values >= 4096.



##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -3956,4 +4002,7 @@ public static int metaServiceRpcRetryTimes() {
                     + "by default"
     })
     public static boolean 
calc_delete_bitmap_get_versions_waiting_for_pending_txns = true;
+
+

Review Comment:
   **[Low — Trailing Blank Lines]**
   
   Two extra blank lines added at the end of the file before the closing brace. 
Please remove.



##########
fe/fe-core/src/main/java/org/apache/doris/tso/TSOService.java:
##########
@@ -0,0 +1,450 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tso;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.journal.local.LocalJournal;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.persist.EditLog;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class TSOService extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(TSOService.class);
+
+    // Global timestamp with physical time and logical counter
+    private final TSOTimestamp globalTimestamp = new TSOTimestamp();
+    // Lock for thread-safe access to global timestamp
+    private final ReentrantLock lock = new ReentrantLock();
+    // Guard value for time window updates (in milliseconds)
+    private static final long UPDATE_TIME_WINDOW_GUARD = 1;
+
+    /**
+     * Constructor initializes the TSO service with update interval
+     */
+    public TSOService() {
+        super("TSO-service", Config.tso_service_update_interval_ms);
+    }
+
+    /**
+     * Start the TSO service and calibrate timestamp if needed
+     */
+    @Override
+    public synchronized void start() {
+        super.start();
+
+        for (int i = 0; i < Config.max_update_tso_retry_count; i++) {
+            lock.lock();
+            boolean isInitialized = false;
+            try {
+                isInitialized = (globalTimestamp.getPhysicalTimestamp() != 0);
+            } finally {
+                lock.unlock();
+            }
+
+            if (!isInitialized) {
+                LOG.info("TSO service timestamp is not calibrated, start 
calibrate timestamp");
+                try {
+                    calibrateTimestamp();
+                } catch (Exception e) {
+                    LOG.warn("TSO service calibrate timestamp failed", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Periodically update timestamp after catalog is ready
+     * This method is called by the MasterDaemon framework
+     */
+    @Override
+    protected void runAfterCatalogReady() {
+        if (!Config.enable_feature_tso) {
+            return;
+        }
+        boolean updated = false;
+        Throwable lastFailure = null;
+        for (int i = 0; i < Config.max_update_tso_retry_count; i++) {
+            lock.lock();
+            boolean isInitialized = false;
+            try {
+                isInitialized = (globalTimestamp.getPhysicalTimestamp() != 0);
+            } finally {
+                lock.unlock();
+            }
+
+            if (!isInitialized) {
+                LOG.info("TSO service timestamp is not calibrated, start 
calibrate timestamp");
+                try {
+                    calibrateTimestamp();
+                    return;
+                } catch (Exception e) {
+                    lastFailure = e;
+                    LOG.warn("TSO service calibrate timestamp failed", e);
+                }
+            }
+
+            try {
+                updateTimestamp();
+                updated = true;
+                break; // Update successful, exit the loop
+            } catch (Exception e) {
+                lastFailure = e;
+                LOG.warn("TSO service update timestamp failed, retry: {}", i, 
e);
+                MetricRepo.COUNTER_TSO_CLOCK_UPDATE_FAILED.increase(1L);
+                try {
+                    sleep(Config.tso_service_update_interval_ms);
+                } catch (InterruptedException ie) {
+                    LOG.warn("TSO service sleep interrupted", ie);
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        if (updated) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("TSO service updated timestamp");
+            }
+        } else if (lastFailure != null) {
+            LOG.warn("TSO service update timestamp failed after {} retries",
+                    Config.max_update_tso_retry_count, lastFailure);
+        } else {
+            LOG.warn("TSO service update timestamp failed after {} retries", 
Config.max_update_tso_retry_count);
+        }
+    }
+
+    /**
+     * Generate a single TSO timestamp
+     *
+     * @return Composed TSO timestamp combining physical time and logical 
counter
+     * @throws RuntimeException if TSO is not calibrated or other errors occur
+     */
+    public Long getTSO() {
+        int maxGetTSORetryCount = Math.max(1, Config.max_get_tso_retry_count);
+        for (int i = 0; i < maxGetTSORetryCount; i++) {
+            // Wait for environment to be ready and ensure we're running on 
master FE
+            Env env = Env.getCurrentEnv();
+            if (env == null || !env.isReady()) {
+                LOG.warn("TSO service wait for catalog ready");
+                try {
+                    sleep(200);
+                } catch (InterruptedException ie) {
+                    LOG.warn("TSO service sleep interrupted", ie);
+                    Thread.currentThread().interrupt();
+                }
+                continue;
+            } else if (!env.isMaster()) {
+                LOG.warn("TSO service only run on master FE");
+                try {
+                    sleep(200);
+                } catch (InterruptedException ie) {
+                    LOG.warn("TSO service sleep interrupted", ie);
+                    Thread.currentThread().interrupt();
+                }
+                continue;
+            }
+
+            Pair<Long, Long> pair = generateTSO();
+            long physical = pair.first;
+            long logical = pair.second;
+
+            if (physical == 0) {
+                throw new RuntimeException("TSO timestamp is not calibrated, 
please check");
+            }
+
+            // Check for logical counter overflow
+            if (logical >= TSOTimestamp.MAX_LOGICAL_COUNTER) {
+                LOG.warn("TSO timestamp logical counter overflow, please 
check");
+                try {
+                    sleep(Config.tso_service_update_interval_ms);
+                } catch (InterruptedException ie) {
+                    LOG.warn("TSO service sleep interrupted", ie);
+                    Thread.currentThread().interrupt();
+                }
+                continue;
+            }
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_TSO_CLOCK_GET_SUCCESS.increase(1L);
+            }
+            return TSOTimestamp.composeTimestamp(physical, logical);
+        }
+        return -1L;
+    }
+
+    /**
+     * Get the current composed TSO timestamp
+     *
+     * @return Current TSO timestamp combining physical time and logical 
counter
+     */
+    public long getCurrentTSO() {
+        lock.lock();
+        try {
+            return globalTimestamp.composeTimestamp();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Calibrate the TSO timestamp when service starts
+     * This ensures the timestamp is consistent with the last persisted value
+     *
+     * Algorithm:
+     * - If Tnow - Tlast < 1ms, then Tnext = Tlast + 1
+     * - Otherwise Tnext = Tnow
+     */
+    private void calibrateTimestamp() {
+        // Check if Env is ready before calibration
+        Env env = Env.getCurrentEnv();
+        if (env == null || !env.isReady() || !env.isMaster()) {
+            LOG.warn("Env is not ready or not master, skip TSO timestamp 
calibration");
+            return;
+        }
+
+        long timeLast = env.getWindowEndTSO(); // Last timestamp from EditLog 
replay
+        long timeNow = System.currentTimeMillis() + 
Config.tso_time_offset_debug_mode;
+
+        // Calculate next physical time to ensure monotonicity
+        long nextPhysicalTime;
+        if (timeNow - timeLast < 1) {
+            nextPhysicalTime = timeLast + 1;
+        } else {
+            nextPhysicalTime = timeNow;
+        }
+
+        // Construct new timestamp (physical time with reset logical counter)
+        setTSOPhysical(nextPhysicalTime, true);
+
+        // Write the right boundary of time window to BDBJE for persistence
+        long timeWindowEnd = nextPhysicalTime + 
Config.tso_service_window_duration_ms;
+        env.setWindowEndTSO(timeWindowEnd);
+        writeTimestampToBDBJE(timeWindowEnd);
+
+        LOG.info("TSO timestamp calibrated: lastTimestamp={}, 
currentMillis={}, nextPhysicalTime={}, timeWindowEnd={}",
+                timeLast, timeNow, nextPhysicalTime, timeWindowEnd);
+        if (MetricRepo.isInit) {
+            MetricRepo.COUNTER_TSO_CLOCK_CALCULATED.increase(1L);
+        }
+    }
+
+    /**
+     * Update timestamp periodically to maintain time window
+     * This method handles various time-related issues:
+     * 1. Clock drift detection
+     * 2. Clock backward detection
+     * 3. Logical counter overflow handling
+     * 4. Time window renewal
+     */
+    private void updateTimestamp() {
+        // Check if Env is ready
+        Env env = Env.getCurrentEnv();
+        if (env == null || !env.isReady() || !env.isMaster()) {
+            LOG.warn("Env is not ready or not master, skip TSO timestamp 
update");
+            return;
+        }
+
+        // 1. Check if TSO has been calibrated
+        long currentTime = System.currentTimeMillis() + 
Config.tso_time_offset_debug_mode;
+        long prevPhysicalTime = 0;
+        long prevLogicalCounter = 0;
+
+        lock.lock();
+        try {
+            prevPhysicalTime = globalTimestamp.getPhysicalTimestamp();
+            prevLogicalCounter = globalTimestamp.getLogicalCounter();
+        } finally {
+            lock.unlock();
+        }
+
+        if (prevPhysicalTime == 0) {
+            LOG.error("TSO timestamp is not calibrated, please check");
+        }
+
+        // 2. Check for serious clock issues
+        long timeLag = currentTime - prevPhysicalTime;
+        if (timeLag >= 3 * Config.tso_service_update_interval_ms) {
+            // Clock drift (time difference too large), log clearly and 
trigger corresponding metric
+            LOG.warn("TSO clock drift detected, lastPhysicalTime={}, 
currentTime={}, "
+                            + "timeLag={} (exceeds 3 * update interval {})",
+                    prevPhysicalTime, currentTime, timeLag, 3 * 
Config.tso_service_update_interval_ms);
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_TSO_CLOCK_DRIFT_DETECTED.increase(1L);
+            }
+        } else if (timeLag < 0) {
+            // Clock backward (current time earlier than last recorded time)
+            // log clearly and trigger corresponding metric
+            LOG.warn("TSO clock backward detected, lastPhysicalTime={}, 
currentTime={}, "
+                            + "timeLag={} (current time is earlier than last 
physical time)",
+                    prevPhysicalTime, currentTime, timeLag);
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_TSO_CLOCK_BACKWARD_DETECTED.increase(1L);
+            }
+        }
+
+        // 3. Update time based on conditions
+        long nextPhysicalTime = prevPhysicalTime;
+        if (timeLag > UPDATE_TIME_WINDOW_GUARD) {
+            // Align physical time to current time
+            nextPhysicalTime = currentTime;
+        } else if (prevLogicalCounter > TSOTimestamp.MAX_LOGICAL_COUNTER / 2) {
+            // Logical counter nearly full → advance to next millisecond
+            nextPhysicalTime = prevPhysicalTime + 1;
+        } else {
+            // Logical counter not nearly full → just increment logical counter
+            // do nothing
+        }
+
+        // 4. Check if time window right boundary needs renewal
+        if ((env.getWindowEndTSO() - nextPhysicalTime) <= 
UPDATE_TIME_WINDOW_GUARD) {
+            // Time window right boundary needs renewal
+            long nextWindowEnd = nextPhysicalTime + 
Config.tso_service_window_duration_ms;
+            env.setWindowEndTSO(nextWindowEnd);
+            writeTimestampToBDBJE(nextWindowEnd);
+        }
+
+        // 5. Update global timestamp
+        setTSOPhysical(nextPhysicalTime, false);
+        if (MetricRepo.isInit) {
+            MetricRepo.COUNTER_TSO_CLOCK_UPDATED.increase(1L);
+        }
+    }
+
+    /**
+     * Write the right boundary of TSO time window to BDBJE for persistence
+     *
+     * @param timestamp The timestamp to write
+     */
+    private void writeTimestampToBDBJE(long timestamp) {
+        try {
+            // Check if Env is ready
+            Env env = Env.getCurrentEnv();
+            if (env == null) {
+                LOG.warn("Env is null, skip writing TSO timestamp to BDBJE");
+                return;
+            }
+
+            // Check if Env is ready and is master
+            if (!env.isReady()) {
+                LOG.warn("Env is not ready, skip writing TSO timestamp to 
BDBJE");
+                return;
+            }
+
+            if (!env.isMaster()) {
+                LOG.warn("Current node is not master, skip writing TSO 
timestamp to BDBJE");
+                return;
+            }
+
+            TSOTimestamp tsoTimestamp = new TSOTimestamp(timestamp, 0);
+
+            // Check if EditLog is available
+            EditLog editLog = env.getEditLog();
+            if (editLog == null) {
+                LOG.warn("EditLog is null, skip writing TSO timestamp to 
BDBJE");
+                return;
+            }
+
+            // Additional check to ensure EditLog's journal is properly 
initialized
+            if (editLog.getJournal() == null) {
+                LOG.warn("EditLog's journal is null, skip writing TSO 
timestamp to BDBJE");
+                return;
+            }
+
+            if (editLog.getJournal() instanceof LocalJournal) {
+                if (!((LocalJournal) editLog.getJournal()).isReadyToFlush()) {
+                    LOG.warn("EditLog's journal is not ready to flush, skip 
writing TSO timestamp to BDBJE");
+                    return;
+                }
+            }
+
+            if (Config.enable_tso_persist_journal) {
+                editLog.logTSOTimestampWindowEnd(tsoTimestamp);
+            } else {
+                LOG.debug("TSO timestamp {} is not persisted to journal, "
+                        + "please check if enable_tso_persist_journal is set 
to true",
+                        tsoTimestamp);
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to write TSO timestamp to BDBJE", e);
+        }
+    }
+
+    /**
+     * Generate a single TSO timestamp by incrementing the logical counter
+     *
+     * @return Pair of (physicalTime, updatedLogicalCounter) for the base 
timestamp
+     */
+    private Pair<Long, Long> generateTSO() {
+        lock.lock();
+        try {
+            long physicalTime = globalTimestamp.getPhysicalTimestamp();
+            if (physicalTime == 0) {
+                return Pair.of(0L, 0L);
+            }
+            long logicalCounter = globalTimestamp.getLogicalCounter();
+            globalTimestamp.setLogicalCounter(logicalCounter + 1);
+            logicalCounter = globalTimestamp.getLogicalCounter();

Review Comment:
   **[Low — Redundant Read-back]**
   
   Line 407 reads back the logical counter immediately after setting it on line 
406, while still under the lock. Since `setLogicalCounter` stores exactly the 
value passed, this is equivalent to `logicalCounter = logicalCounter + 1`. The 
extra getter call adds no value.
   
   ```java
   // Current (redundant):
   globalTimestamp.setLogicalCounter(logicalCounter + 1);
   logicalCounter = globalTimestamp.getLogicalCounter();
   
   // Simplified:
   logicalCounter = logicalCounter + 1;
   globalTimestamp.setLogicalCounter(logicalCounter);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to