This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 77aadf1fd14 [Fix](job)Fix for Duplicate Scheduling of Tasks (#46872)
77aadf1fd14 is described below

commit 77aadf1fd14f620ddd4b5401d0cc7e1891d82584
Author: Calvin Kirs <guoqi...@selectdb.com>
AuthorDate: Mon Jan 13 16:22:30 2025 +0800

    [Fix](job)Fix for Duplicate Scheduling of Tasks (#46872)
    
    ### Problem Description
    The current scheduling logic calculates the next scheduled time and adds
    it to the task queue when the condition triggerTime <= windowEndTimeMs
    is met. However, this can lead to a task being scheduled twice if its
    triggerTime is exactly equal to windowEndTimeMs:
    
    - The task is added to the current scheduling window.
    - At the same time, this timestamp becomes the startTime for the next
    scheduling window, causing the task to be scheduled again.
    
    ### Changes Made
    Updated the condition from triggerTime <= windowEndTimeMs to triggerTime
    < windowEndTimeMs. This ensures that the scheduling time doesn’t overlap
    with the window’s end time, preventing duplicate scheduling.
---
 .../doris/job/base/JobExecutionConfiguration.java  |  2 +-
 .../job/base/JobExecutionConfigurationTest.java    | 59 ++++++++++++++++++++--
 2 files changed, 57 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index d564b114312..80e8b0cf5e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -181,7 +181,7 @@ public class JobExecutionConfiguration {
         }
 
         // Calculate the trigger time list
-        for (long triggerTime = firstTriggerTime; triggerTime <= 
windowEndTimeMs; triggerTime += intervalMs) {
+        for (long triggerTime = firstTriggerTime; triggerTime < 
windowEndTimeMs; triggerTime += intervalMs) {
             if (null == timerDefinition.getEndTimeMs()
                     || triggerTime < timerDefinition.getEndTimeMs()) {
                 timerDefinition.setLatestSchedulerTimeMs(triggerTime);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
index 163b2494189..fb0600b281f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
@@ -17,15 +17,20 @@
 
 package org.apache.doris.job.base;
 
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.common.IntervalUnit;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
 public class JobExecutionConfigurationTest {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobExecutionConfigurationTest.class);
+
     @Test
     public void testGetTriggerDelayTimesOneTime() {
         JobExecutionConfiguration configuration = new 
JobExecutionConfiguration();
@@ -73,16 +78,64 @@ public class JobExecutionConfigurationTest {
         long second = 1000L;
         timerDefinition.setStartTimeMs(second);
         timerDefinition.setInterval(1L);
-        Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 
5 + 10L, second * 3, second * 7).size());
-        Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 
5, second * 5, second * 7).size());
+        Assertions.assertEquals(2, configuration.getTriggerDelayTimes(second * 
5 + 10L, second * 3, second * 7).size());
+        Assertions.assertEquals(2, configuration.getTriggerDelayTimes(second * 
5, second * 5, second * 7).size());
         timerDefinition.setStartTimeMs(1672531200000L);
         timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
         timerDefinition.setInterval(1L);
-        Assertions.assertArrayEquals(new Long[]{0L}, 
configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 
1672531800000L).toArray());
 
         List<Long> expectDelayTimes = 
configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 
1672531850000L);
 
         Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 
300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray());
+        timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
+        timerDefinition.setInterval(1L);
+        timerDefinition.setStartTimeMs(1577808000000L);
+        // Log detailed time information
+        LOG.info("Current time is: "
+                + TimeUtils.longToTimeStringWithms(1736459699000L));
+        LOG.info("Start time window is: "
+                + TimeUtils.longToTimeStringWithms(1736459698000L));
+        LOG.info("Latest batch scheduler timer task time is: "
+                + TimeUtils.longToTimeStringWithms(1736460299000L));
+
+        // Get and log trigger delay times
+        delayTimes = configuration.getTriggerDelayTimes(1736459699000L, 
1736459698000L,
+                1736460299000L);
+        Assertions.assertEquals(10, delayTimes.size());
+        LOG.info("Trigger delay times size: " + delayTimes.size());
+        delayTimes.forEach(a -> LOG.info(TimeUtils.longToTimeStringWithms(a * 
1000 + 1736459699000L)));
+
+        LOG.info("----");
+
+        // Log detailed time information
+        LOG.info("Current time is: "
+                + TimeUtils.longToTimeStringWithms(1736460901000L));
+        LOG.info("Start time window is: "
+                + TimeUtils.longToTimeStringWithms(1736460900000L));
+        LOG.info("Latest batch scheduler timer task time is: "
+                + TimeUtils.longToTimeStringWithms(1736461501000L));
+
+        // Get and log trigger delay times
+        delayTimes = configuration.getTriggerDelayTimes(1736460901000L, 
1736460900000L,
+                1736461501000L);
+        Assertions.assertEquals(11, delayTimes.size());
+        LOG.info("Trigger delay times size: " + delayTimes.size());
+        delayTimes.forEach(a -> LOG.info(TimeUtils.longToTimeStringWithms(a * 
1000 + 1736460901000L)));
+
+        LOG.info("----");
+
+        // Log detailed time information
+        LOG.info("Current time is: " + 
TimeUtils.longToTimeStringWithms(1736461502000L));
+        LOG.info("Start time window is: " + 
TimeUtils.longToTimeStringWithms(1736461501000L));
+        LOG.info("Latest batch scheduler timer task time is: "
+                + TimeUtils.longToTimeStringWithms(1736462102000L));
+
+        // Get and log trigger delay times
+        delayTimes = configuration.getTriggerDelayTimes(1736461502000L, 
1736461501000L,
+                1736462102000L);
+        Assertions.assertEquals(10, delayTimes.size());
+        LOG.info("Trigger delay times size: " + delayTimes.size());
+        delayTimes.forEach(a -> LOG.info(TimeUtils.longToTimeStringWithms(a * 
1000 + 1736461502000L)));
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to