This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 77aadf1fd14 [Fix](job)Fix for Duplicate Scheduling of Tasks (#46872) 77aadf1fd14 is described below commit 77aadf1fd14f620ddd4b5401d0cc7e1891d82584 Author: Calvin Kirs <guoqi...@selectdb.com> AuthorDate: Mon Jan 13 16:22:30 2025 +0800 [Fix](job)Fix for Duplicate Scheduling of Tasks (#46872) ### Problem Description The current scheduling logic calculates the next scheduled time and adds it to the task queue when the condition triggerTime <= windowEndTimeMs is met. However, this can lead to a task being scheduled twice if its triggerTime is exactly equal to windowEndTimeMs: - The task is added to the current scheduling window. - At the same time, this timestamp becomes the startTime for the next scheduling window, causing the task to be scheduled again. ### Changes Made Updated the condition from triggerTime <= windowEndTimeMs to triggerTime < windowEndTimeMs. This ensures that the scheduling time doesn’t overlap with the window’s end time, preventing duplicate scheduling. --- .../doris/job/base/JobExecutionConfiguration.java | 2 +- .../job/base/JobExecutionConfigurationTest.java | 59 ++++++++++++++++++++-- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index d564b114312..80e8b0cf5e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -181,7 +181,7 @@ public class JobExecutionConfiguration { } // Calculate the trigger time list - for (long triggerTime = firstTriggerTime; triggerTime <= windowEndTimeMs; triggerTime += intervalMs) { + for (long triggerTime = firstTriggerTime; triggerTime < windowEndTimeMs; triggerTime += intervalMs) { if (null == timerDefinition.getEndTimeMs() || triggerTime < timerDefinition.getEndTimeMs()) { timerDefinition.setLatestSchedulerTimeMs(triggerTime); diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index 163b2494189..fb0600b281f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -17,15 +17,20 @@ package org.apache.doris.job.base; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.common.IntervalUnit; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; public class JobExecutionConfigurationTest { + private static final Logger LOG = LoggerFactory.getLogger(JobExecutionConfigurationTest.class); + @Test public void testGetTriggerDelayTimesOneTime() { JobExecutionConfiguration configuration = new JobExecutionConfiguration(); @@ -73,16 +78,64 @@ public class JobExecutionConfigurationTest { long second = 1000L; timerDefinition.setStartTimeMs(second); timerDefinition.setInterval(1L); - Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size()); - Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size()); + Assertions.assertEquals(2, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size()); + Assertions.assertEquals(2, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size()); timerDefinition.setStartTimeMs(1672531200000L); timerDefinition.setIntervalUnit(IntervalUnit.MINUTE); timerDefinition.setInterval(1L); - Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray()); List<Long> expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L); Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray()); + timerDefinition.setIntervalUnit(IntervalUnit.MINUTE); + timerDefinition.setInterval(1L); + timerDefinition.setStartTimeMs(1577808000000L); + // Log detailed time information + LOG.info("Current time is: " + + TimeUtils.longToTimeStringWithms(1736459699000L)); + LOG.info("Start time window is: " + + TimeUtils.longToTimeStringWithms(1736459698000L)); + LOG.info("Latest batch scheduler timer task time is: " + + TimeUtils.longToTimeStringWithms(1736460299000L)); + + // Get and log trigger delay times + delayTimes = configuration.getTriggerDelayTimes(1736459699000L, 1736459698000L, + 1736460299000L); + Assertions.assertEquals(10, delayTimes.size()); + LOG.info("Trigger delay times size: " + delayTimes.size()); + delayTimes.forEach(a -> LOG.info(TimeUtils.longToTimeStringWithms(a * 1000 + 1736459699000L))); + + LOG.info("----"); + + // Log detailed time information + LOG.info("Current time is: " + + TimeUtils.longToTimeStringWithms(1736460901000L)); + LOG.info("Start time window is: " + + TimeUtils.longToTimeStringWithms(1736460900000L)); + LOG.info("Latest batch scheduler timer task time is: " + + TimeUtils.longToTimeStringWithms(1736461501000L)); + + // Get and log trigger delay times + delayTimes = configuration.getTriggerDelayTimes(1736460901000L, 1736460900000L, + 1736461501000L); + Assertions.assertEquals(11, delayTimes.size()); + LOG.info("Trigger delay times size: " + delayTimes.size()); + delayTimes.forEach(a -> LOG.info(TimeUtils.longToTimeStringWithms(a * 1000 + 1736460901000L))); + + LOG.info("----"); + + // Log detailed time information + LOG.info("Current time is: " + TimeUtils.longToTimeStringWithms(1736461502000L)); + LOG.info("Start time window is: " + TimeUtils.longToTimeStringWithms(1736461501000L)); + LOG.info("Latest batch scheduler timer task time is: " + + TimeUtils.longToTimeStringWithms(1736462102000L)); + + // Get and log trigger delay times + delayTimes = configuration.getTriggerDelayTimes(1736461502000L, 1736461501000L, + 1736462102000L); + Assertions.assertEquals(10, delayTimes.size()); + LOG.info("Trigger delay times size: " + delayTimes.size()); + delayTimes.forEach(a -> LOG.info(TimeUtils.longToTimeStringWithms(a * 1000 + 1736461502000L))); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org