rpuch commented on code in PR #6330:
URL: https://github.com/apache/ignite-3/pull/6330#discussion_r2239571638


##########
modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java:
##########
@@ -212,12 +222,54 @@ public void getLowWatermarkSafe(Consumer<@Nullable 
HybridTimestamp> consumer) {
     }
 
     private void scheduleUpdateLowWatermarkBusy() {
+        scheduleUpdateLowWatermarkTaskLock.lock();
+
+        try {
+            ScheduledUpdateLowWatermarkTask lastTask = 
lastScheduledUpdateLowWatermarkTask.get();
+            ScheduledUpdateLowWatermarkTask newTask = new 
ScheduledUpdateLowWatermarkTask(this, State.NEW);
+
+            State lastTaskState = lastTask == null ? State.COMPLETED : 
lastTask.state();
+
+            switch (lastTaskState) {
+                case NEW:
+                    if (lastTask.tryCancel()) {
+                        boolean casResult = 
lastScheduledUpdateLowWatermarkTask.compareAndSet(lastTask, newTask);
+
+                        assert casResult : "It is forbidden to set a task in 
parallel";
+
+                        scheduleUpdateLowWatermarkTaskBusy(newTask);
+                    }
+
+                    break;
+                case IN_PROGRESS:
+                case CANCELLED:

Review Comment:
   Why will it be rescheduled? And why don't we need to reschedule our new task 
if the previous task cancelled (and, hence, it did not schedule any 
continuation)?



##########
modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java:
##########
@@ -124,6 +128,10 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
 
     private final Map<UUID, LowWatermarkLock> locks = new 
ConcurrentHashMap<>();
 
+    private final AtomicReference<ScheduledUpdateLowWatermarkTask> 
lastScheduledUpdateLowWatermarkTask = new AtomicReference<>();

Review Comment:
   This seems to be only accessed under the lock. Why is `AtomicReference` 
needed here?



##########
modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java:
##########
@@ -339,16 +395,12 @@ CompletableFuture<Void> updateAndNotify(HybridTimestamp 
newLowWatermark) {
                             .whenCompleteAsync((unused, throwable) -> {
                                 if (throwable != null) {
                                     if (!(hasCause(throwable, 
NodeStoppingException.class))) {
-                                        LOG.error("Failed to update low 
watermark, will schedule again: {}", throwable, newLowWatermark);
+                                        LOG.error("Failed to update low 
watermark: {}", throwable, newLowWatermark);
 
                                         failureManager.process(new 
FailureContext(CRITICAL_ERROR, throwable));
-
-                                        inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);

Review Comment:
   Why is this removed?



##########
modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java:
##########
@@ -336,6 +342,36 @@ void 
testUpdateAndNotifyNotInvokeFailureManagerWhenGetNodeStoppingException() {
         verify(failureManager, never()).process(any());
     }
 
+    @Test
+    void testParallelScheduleUpdates() throws Exception {
+        assertThat(lowWatermarkConfig.updateIntervalMillis().update(300L), 
willCompleteSuccessfully());
+
+        assertThat(lowWatermark.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        runRace(
+                () -> lowWatermark.scheduleUpdates(),
+                () -> lowWatermark.scheduleUpdates(),
+                () -> lowWatermark.scheduleUpdates(),
+                () -> lowWatermark.scheduleUpdates()
+        );
+
+        Thread.sleep(1_000);
+
+        verify(lwmChangedListener, atLeast(2)).notify(any());

Review Comment:
   Why is it at least 2? It seems that each subsequent task might cancel the 
previous one, so why can't it be just 1 (the last one)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to