This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 13add3c1716 [fix](job)Fix millisecond offset issue in time window scheduling trigger time calculation (#45176) 13add3c1716 is described below commit 13add3c17164c1a5f33dcfd34b4b11932a16ee67 Author: Calvin Kirs <guoqi...@selectdb.com> AuthorDate: Thu Dec 12 16:42:19 2024 +0800 [fix](job)Fix millisecond offset issue in time window scheduling trigger time calculation (#45176) ### Abstract: In the current time window scheduling logic, the calculation of trigger times was not strictly aligned to the second level, which could lead to millisecond offsets. This offset caused issues such as consecutive trigger times at 14:56:59 and 14:57:00, disrupting the correctness of the scheduling. This PR optimizes the calculation of trigger times to ensure that time points are strictly aligned to the second level, preventing the accumulation of millisecond errors. ### Issue Description: Under a specified window (e.g., 14:50:00 to 14:59:00) and a fixed interval (e.g., every minute), the scheduler generated erroneous trigger times such as: ``` | 2024-12-04 14:56:59 | | 2024-12-04 14:57:00 | | 2024-12-04 14:57:59 | | 2024-12-04 14:58:00 | ``` #### Cause: The current firstTriggerTime and the loop calculation did not strictly align trigger times to the second level, resulting in erroneous trigger points due to floating-point or millisecond offset accumulation. The end condition for the time window was not aligned to the second level, which could lead to additional trigger times being included. ### Fix: Modification 1: Strictly align the trigger time to the second level. --- .../main/java/org/apache/doris/common/util/TimeUtils.java | 11 +++++++++++ .../apache/doris/job/base/JobExecutionConfiguration.java | 2 +- .../java/org/apache/doris/job/base/TimerDefinition.java | 7 ++++++- .../java/org/apache/doris/job/scheduler/JobScheduler.java | 13 +++++++++---- .../doris/job/base/JobExecutionConfigurationTest.java | 7 +++++++ 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java index e7066846c30..d88971a6e72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java @@ -257,6 +257,17 @@ public class TimeUtils { return d.getTime(); } + /** + * Converts a millisecond timestamp to a second-level timestamp. + * + * @param timestamp The millisecond timestamp to be converted. + * @return The timestamp rounded to the nearest second (in milliseconds). + */ + public static long convertToSecondTimestamp(long timestamp) { + // Divide by 1000 to convert to seconds, then multiply by 1000 to return to milliseconds with no fractional part + return (timestamp / 1000) * 1000; + } + public static long timeStringToLong(String timeStr, TimeZone timeZone) { DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone(); dateFormatTimeZone.withZone(timeZone.toZoneId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 4c6ef4d2037..d564b114312 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -155,7 +155,7 @@ public class JobExecutionConfiguration { return 0L; } - return (startTimeMs - currentTimeMs) / 1000; + return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000; } // Returns a list of delay times in seconds for executing the job within the specified window diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java index 9068a18f693..96181877b9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java @@ -17,6 +17,7 @@ package org.apache.doris.job.base; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.common.IntervalUnit; import com.google.gson.annotations.SerializedName; @@ -40,11 +41,15 @@ public class TimerDefinition { public void checkParams() { if (null == startTimeMs) { - startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval); } if (null != endTimeMs && endTimeMs < startTimeMs) { throw new IllegalArgumentException("endTimeMs must be greater than the start time"); } + if (null != endTimeMs) { + endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs); + } if (null != intervalUnit) { if (null == interval) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 921f333791c..2bd6fc04dac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -84,7 +84,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { taskDisruptorGroupManager = new TaskDisruptorGroupManager(); taskDisruptorGroupManager.init(); this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor(); - latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + latestBatchSchedulerTimerTaskTimeMs = currentTimeMs; batchSchedulerTimerJob(); cycleSystemSchedulerTasks(); } @@ -94,7 +95,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { * Jobs will be re-registered after the task is completed */ private void cycleSystemSchedulerTasks() { - log.info("re-register system scheduler timer tasks" + TimeUtils.longToTimeString(System.currentTimeMillis())); + log.info("re-register system scheduler timer tasks, time is " + TimeUtils + .longToTimeStringWithms(System.currentTimeMillis())); timerTaskScheduler.newTimeout(timeout -> { batchSchedulerTimerJob(); cycleSystemSchedulerTasks(); @@ -144,7 +146,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { private void cycleTimerJobScheduler(T job, long startTimeWindowMs) { - List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(), + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + startTimeWindowMs = TimeUtils.convertToSecondTimestamp(startTimeWindowMs); + List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(currentTimeMs, startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs); if (CollectionUtils.isEmpty(delaySeconds)) { log.info("skip job {} scheduler timer job, delay seconds is empty", job.getJobName()); @@ -190,7 +194,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs; if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) { - this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); + long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis()); + this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs; } this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; log.info("execute timer job ids within last ten minutes window, last time window is {}", diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index cce0a93c01d..163b2494189 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -75,7 +75,14 @@ public class JobExecutionConfigurationTest { timerDefinition.setInterval(1L); Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5 + 10L, second * 3, second * 7).size()); Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second * 5, second * 5, second * 7).size()); + timerDefinition.setStartTimeMs(1672531200000L); + timerDefinition.setIntervalUnit(IntervalUnit.MINUTE); + timerDefinition.setInterval(1L); + Assertions.assertArrayEquals(new Long[]{0L}, configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L, 1672531800000L).toArray()); + + List<Long> expectDelayTimes = configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L, 1672531850000L); + Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L, 300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org