This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 13add3c1716 [fix](job)Fix millisecond offset issue in time window 
scheduling trigger time calculation (#45176)
13add3c1716 is described below

commit 13add3c17164c1a5f33dcfd34b4b11932a16ee67
Author: Calvin Kirs <guoqi...@selectdb.com>
AuthorDate: Thu Dec 12 16:42:19 2024 +0800

    [fix](job)Fix millisecond offset issue in time window scheduling trigger 
time calculation (#45176)
    
    ### Abstract:
    In the current time window scheduling logic, the calculation of trigger
    times was not strictly aligned to the second level, which could lead to
    millisecond offsets. This offset caused issues such as consecutive
    trigger times at 14:56:59 and 14:57:00, disrupting the correctness of
    the scheduling.
    
    This PR optimizes the calculation of trigger times to ensure that time
    points are strictly aligned to the second level, preventing the
    accumulation of millisecond errors.
    
    ### Issue Description:
    
    Under a specified window (e.g., 14:50:00 to 14:59:00) and a fixed
    interval (e.g., every minute), the scheduler generated erroneous trigger
    times such as:
    
    ```
    | 2024-12-04 14:56:59 |
    | 2024-12-04 14:57:00 |
    | 2024-12-04 14:57:59 |
    | 2024-12-04 14:58:00 |
    ```
    #### Cause:
    The current firstTriggerTime and the loop calculation did not strictly
    align trigger times to the second level, resulting in erroneous trigger
    points due to floating-point or millisecond offset accumulation. The end
    condition for the time window was not aligned to the second level, which
    could lead to additional trigger times being included.
    
    ### Fix:
    Modification 1: Strictly align the trigger time to the second level.
---
 .../main/java/org/apache/doris/common/util/TimeUtils.java   | 11 +++++++++++
 .../apache/doris/job/base/JobExecutionConfiguration.java    |  2 +-
 .../java/org/apache/doris/job/base/TimerDefinition.java     |  7 ++++++-
 .../java/org/apache/doris/job/scheduler/JobScheduler.java   | 13 +++++++++----
 .../doris/job/base/JobExecutionConfigurationTest.java       |  7 +++++++
 5 files changed, 34 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
index e7066846c30..d88971a6e72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
@@ -257,6 +257,17 @@ public class TimeUtils {
         return d.getTime();
     }
 
+    /**
+     * Converts a millisecond timestamp to a second-level timestamp.
+     *
+     * @param timestamp The millisecond timestamp to be converted.
+     * @return The timestamp rounded to the nearest second (in milliseconds).
+     */
+    public static long convertToSecondTimestamp(long timestamp) {
+        // Divide by 1000 to convert to seconds, then multiply by 1000 to 
return to milliseconds with no fractional part
+        return (timestamp / 1000) * 1000;
+    }
+
     public static long timeStringToLong(String timeStr, TimeZone timeZone) {
         DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone();
         dateFormatTimeZone.withZone(timeZone.toZoneId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 4c6ef4d2037..d564b114312 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -155,7 +155,7 @@ public class JobExecutionConfiguration {
             return 0L;
         }
 
-        return (startTimeMs - currentTimeMs) / 1000;
+        return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000;
     }
 
     // Returns a list of delay times in seconds for executing the job within 
the specified window
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
index 9068a18f693..96181877b9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.job.base;
 
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.common.IntervalUnit;
 
 import com.google.gson.annotations.SerializedName;
@@ -40,11 +41,15 @@ public class TimerDefinition {
 
     public void checkParams() {
         if (null == startTimeMs) {
-            startTimeMs = System.currentTimeMillis() + 
intervalUnit.getIntervalMs(interval);
+            long currentTimeMs = 
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+            startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval);
         }
         if (null != endTimeMs && endTimeMs < startTimeMs) {
             throw new IllegalArgumentException("endTimeMs must be greater than 
the start time");
         }
+        if (null != endTimeMs) {
+            endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs);
+        }
 
         if (null != intervalUnit) {
             if (null == interval) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 921f333791c..2bd6fc04dac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -84,7 +84,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
         taskDisruptorGroupManager = new TaskDisruptorGroupManager();
         taskDisruptorGroupManager.init();
         this.timerJobDisruptor = 
taskDisruptorGroupManager.getDispatchDisruptor();
-        latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
+        long currentTimeMs = 
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+        latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
         batchSchedulerTimerJob();
         cycleSystemSchedulerTasks();
     }
@@ -94,7 +95,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
      * Jobs will be re-registered after the task is completed
      */
     private void cycleSystemSchedulerTasks() {
-        log.info("re-register system scheduler timer tasks" + 
TimeUtils.longToTimeString(System.currentTimeMillis()));
+        log.info("re-register system scheduler timer tasks, time is " + 
TimeUtils
+                .longToTimeStringWithms(System.currentTimeMillis()));
         timerTaskScheduler.newTimeout(timeout -> {
             batchSchedulerTimerJob();
             cycleSystemSchedulerTasks();
@@ -144,7 +146,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
 
 
     private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
-        List<Long> delaySeconds = 
job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
+        long currentTimeMs = 
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+        startTimeWindowMs = 
TimeUtils.convertToSecondTimestamp(startTimeWindowMs);
+        List<Long> delaySeconds = 
job.getJobConfig().getTriggerDelayTimes(currentTimeMs,
                 startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
         if (CollectionUtils.isEmpty(delaySeconds)) {
             log.info("skip job {} scheduler timer job, delay seconds is 
empty", job.getJobName());
@@ -190,7 +194,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
 
         long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
         if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
-            this.latestBatchSchedulerTimerTaskTimeMs = 
System.currentTimeMillis();
+            long currentTimeMs = 
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+            this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
         }
         this.latestBatchSchedulerTimerTaskTimeMs += 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
         log.info("execute timer job ids within last ten minutes window, last 
time window is {}",
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
index cce0a93c01d..163b2494189 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
@@ -75,7 +75,14 @@ public class JobExecutionConfigurationTest {
         timerDefinition.setInterval(1L);
         Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 
5 + 10L, second * 3, second * 7).size());
         Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 
5, second * 5, second * 7).size());
+        timerDefinition.setStartTimeMs(1672531200000L);
+        timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
+        timerDefinition.setInterval(1L);
+        Assertions.assertArrayEquals(new Long[]{0L}, 
configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 
1672531800000L).toArray());
+
+        List<Long> expectDelayTimes = 
configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 
1672531850000L);
 
+        Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 
300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray());
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to