rpuch commented on code in PR #6330: URL: https://github.com/apache/ignite-3/pull/6330#discussion_r2239571638
########## modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java: ########## @@ -212,12 +222,54 @@ public void getLowWatermarkSafe(Consumer<@Nullable HybridTimestamp> consumer) { } private void scheduleUpdateLowWatermarkBusy() { + scheduleUpdateLowWatermarkTaskLock.lock(); + + try { + ScheduledUpdateLowWatermarkTask lastTask = lastScheduledUpdateLowWatermarkTask.get(); + ScheduledUpdateLowWatermarkTask newTask = new ScheduledUpdateLowWatermarkTask(this, State.NEW); + + State lastTaskState = lastTask == null ? State.COMPLETED : lastTask.state(); + + switch (lastTaskState) { + case NEW: + if (lastTask.tryCancel()) { + boolean casResult = lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask); + + assert casResult : "It is forbidden to set a task in parallel"; + + scheduleUpdateLowWatermarkTaskBusy(newTask); + } + + break; + case IN_PROGRESS: + case CANCELLED: Review Comment: Why will it be rescheduled? And why don't we need to reschedule our new task if the previous task cancelled (and, hence, it did not schedule any continuation)? ########## modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java: ########## @@ -124,6 +128,10 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L private final Map<UUID, LowWatermarkLock> locks = new ConcurrentHashMap<>(); + private final AtomicReference<ScheduledUpdateLowWatermarkTask> lastScheduledUpdateLowWatermarkTask = new AtomicReference<>(); Review Comment: This seems to be only accessed under the lock. Why is `AtomicReference` needed here? ########## modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java: ########## @@ -339,16 +395,12 @@ CompletableFuture<Void> updateAndNotify(HybridTimestamp newLowWatermark) { .whenCompleteAsync((unused, throwable) -> { if (throwable != null) { if (!(hasCause(throwable, NodeStoppingException.class))) { - LOG.error("Failed to update low watermark, will schedule again: {}", throwable, newLowWatermark); + LOG.error("Failed to update low watermark: {}", throwable, newLowWatermark); failureManager.process(new FailureContext(CRITICAL_ERROR, throwable)); - - inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); Review Comment: Why is this removed? ########## modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java: ########## @@ -336,6 +342,36 @@ void testUpdateAndNotifyNotInvokeFailureManagerWhenGetNodeStoppingException() { verify(failureManager, never()).process(any()); } + @Test + void testParallelScheduleUpdates() throws Exception { + assertThat(lowWatermarkConfig.updateIntervalMillis().update(300L), willCompleteSuccessfully()); + + assertThat(lowWatermark.startAsync(new ComponentContext()), willCompleteSuccessfully()); + + runRace( + () -> lowWatermark.scheduleUpdates(), + () -> lowWatermark.scheduleUpdates(), + () -> lowWatermark.scheduleUpdates(), + () -> lowWatermark.scheduleUpdates() + ); + + Thread.sleep(1_000); + + verify(lwmChangedListener, atLeast(2)).notify(any()); Review Comment: Why is it at least 2? It seems that each subsequent task might cancel the previous one, so why can't it be just 1 (the last one)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org