tkalkirill commented on code in PR #6199:
URL: https://github.com/apache/ignite-3/pull/6199#discussion_r2189908655


##########
modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java:
##########
@@ -49,12 +62,49 @@ protected int initialNodes() {
         return 1;
     }
 
+    @BeforeAll
+    void waitForDefaultZoneStabilization() throws InterruptedException {
+        if (colocationEnabled()) {
+            awaitAssignmentsStabilization(node(0));

Review Comment:
   WOW, now we have to do this in all tests?
   It's not good for me that such constructions appear or have to be called. 
   Can we at least call this in the cluster startup code so that we don't have 
to do it manually?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java:
##########
@@ -224,36 +227,34 @@ public CompletableFuture<Void> notifyWatches(long 
newRevision, List<Entry> updat
      * @return Updated value of {@link #notificationFuture}.
      */
     @VisibleForTesting
-    CompletableFuture<Void> enqueue(Supplier<CompletableFuture<Void>> 
asyncAction, Supplier<String> additionalInfoSupplier) {
-        while (true) {
-            CompletableFuture<Void> chainingFuture = new CompletableFuture<>();
-
-            CompletableFuture<Void> newNotificationFuture = chainingFuture
+    CompletableFuture<Void> enqueue(
+            Supplier<CompletableFuture<Void>> asyncAction,
+            Consumer<CompletableFuture<Void>> afterEnqueuing,
+            Supplier<String> additionalInfoSupplier
+    ) {
+        synchronized (notificationFutureMutex) {

Review Comment:
   Previously there was a non-blocking algorithm, but now it is not really 
necessary?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/NotificationEnqueuedListener.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Listener that gets invoked when {@link WatchProcessor}'s internal 
notification chain future is updated.
+ */
+@FunctionalInterface
+public interface NotificationEnqueuedListener {
+    /**
+     * Notifies this listener that {@link WatchProcessor}'s internal 
notification chain future is updated.
+     *
+     * <p>This must always be run under the same lock under which the 
notification future is updated.
+     *
+     * @param newNotificationFuture New notification future.
+     * @param entries Entries corresponding to the update (empty if the 
notification is not about a new revision,
+     *     but about Metastorage safe time advancement.
+     * @param timestamp Metastorage timestamp.
+     */
+    void onEnqueued(CompletableFuture<Void> newNotificationFuture, List<Entry> 
entries, HybridTimestamp timestamp);

Review Comment:
   Do I understand correctly that this is only needed for schema sync?
   For the general case it doesn't look safe.
   Do you think it might be easy to call some kind of callback before all 
notifications?



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java:
##########
@@ -288,6 +289,48 @@ void nodeStoppingExceptionDoesNotTriggerFailureManager() {
         verify(failureManager, never()).process(any());
     }
 
+    @Test
+    void 
eventNotificationUpdatesMetastoreSafeTimeAfterNotifyingWatchListeners() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();

Review Comment:
   U can use var.



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java:
##########
@@ -288,6 +289,48 @@ void nodeStoppingExceptionDoesNotTriggerFailureManager() {
         verify(failureManager, never()).process(any());
     }
 
+    @Test
+    void 
eventNotificationUpdatesMetastoreSafeTimeAfterNotifyingWatchListeners() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();
+        WatchListener listener = mock(WatchListener.class);
+        when(listener.onUpdate(any())).thenReturn(listenerFuture);
+
+        watchProcessor.addWatch(new Watch(0, listener, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));
+
+        var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, TIMESTAMP);
+        HybridTimestamp ts = new HybridTimestamp(1, 2);
+
+        watchProcessor.notifyWatches(1, List.of(entry1), ts);
+
+        verify(watchEventHandlingCallback, 
timeout(100).times(0)).onSafeTimeAdvanced(any());
+
+        listenerFuture.complete(null);
+
+        verify(watchEventHandlingCallback, 
timeout(SECONDS.toMillis(10))).onSafeTimeAdvanced(ts);
+    }
+
+    @Test
+    void 
metastoreSafeTimeGetsAdvancedAfterPreviousNotificationChainMembesAreFinished() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();
+        WatchListener listener = mock(WatchListener.class);
+        when(listener.onUpdate(any())).thenReturn(listenerFuture);
+
+        watchProcessor.addWatch(new Watch(0, listener, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));

Review Comment:
   Maybe we should make foo".getBytes(UTF_8) a constant?



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java:
##########
@@ -288,6 +289,48 @@ void nodeStoppingExceptionDoesNotTriggerFailureManager() {
         verify(failureManager, never()).process(any());
     }
 
+    @Test
+    void 
eventNotificationUpdatesMetastoreSafeTimeAfterNotifyingWatchListeners() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();
+        WatchListener listener = mock(WatchListener.class);
+        when(listener.onUpdate(any())).thenReturn(listenerFuture);
+
+        watchProcessor.addWatch(new Watch(0, listener, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));
+
+        var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, TIMESTAMP);
+        HybridTimestamp ts = new HybridTimestamp(1, 2);
+
+        watchProcessor.notifyWatches(1, List.of(entry1), ts);
+
+        verify(watchEventHandlingCallback, 
timeout(100).times(0)).onSafeTimeAdvanced(any());
+
+        listenerFuture.complete(null);
+
+        verify(watchEventHandlingCallback, 
timeout(SECONDS.toMillis(10))).onSafeTimeAdvanced(ts);
+    }
+
+    @Test
+    void 
metastoreSafeTimeGetsAdvancedAfterPreviousNotificationChainMembesAreFinished() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();

Review Comment:
   U can use var.



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java:
##########
@@ -288,6 +289,48 @@ void nodeStoppingExceptionDoesNotTriggerFailureManager() {
         verify(failureManager, never()).process(any());
     }
 
+    @Test
+    void 
eventNotificationUpdatesMetastoreSafeTimeAfterNotifyingWatchListeners() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();
+        WatchListener listener = mock(WatchListener.class);
+        when(listener.onUpdate(any())).thenReturn(listenerFuture);
+
+        watchProcessor.addWatch(new Watch(0, listener, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));
+
+        var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, TIMESTAMP);
+        HybridTimestamp ts = new HybridTimestamp(1, 2);

Review Comment:
   U can use var.



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java:
##########
@@ -288,6 +289,48 @@ void nodeStoppingExceptionDoesNotTriggerFailureManager() {
         verify(failureManager, never()).process(any());
     }
 
+    @Test
+    void 
eventNotificationUpdatesMetastoreSafeTimeAfterNotifyingWatchListeners() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();
+        WatchListener listener = mock(WatchListener.class);
+        when(listener.onUpdate(any())).thenReturn(listenerFuture);
+
+        watchProcessor.addWatch(new Watch(0, listener, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));
+
+        var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, TIMESTAMP);
+        HybridTimestamp ts = new HybridTimestamp(1, 2);
+
+        watchProcessor.notifyWatches(1, List.of(entry1), ts);
+
+        verify(watchEventHandlingCallback, 
timeout(100).times(0)).onSafeTimeAdvanced(any());

Review Comment:
   Waiting for 100ms can lead to flaky, 1s will be better as by analogy with 
`CompletableFutureMatcher#willSucceedFast`.



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java:
##########
@@ -288,6 +289,48 @@ void nodeStoppingExceptionDoesNotTriggerFailureManager() {
         verify(failureManager, never()).process(any());
     }
 
+    @Test
+    void 
eventNotificationUpdatesMetastoreSafeTimeAfterNotifyingWatchListeners() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();
+        WatchListener listener = mock(WatchListener.class);
+        when(listener.onUpdate(any())).thenReturn(listenerFuture);
+
+        watchProcessor.addWatch(new Watch(0, listener, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));
+
+        var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, TIMESTAMP);
+        HybridTimestamp ts = new HybridTimestamp(1, 2);
+
+        watchProcessor.notifyWatches(1, List.of(entry1), ts);
+
+        verify(watchEventHandlingCallback, 
timeout(100).times(0)).onSafeTimeAdvanced(any());
+
+        listenerFuture.complete(null);
+
+        verify(watchEventHandlingCallback, 
timeout(SECONDS.toMillis(10))).onSafeTimeAdvanced(ts);
+    }
+
+    @Test
+    void 
metastoreSafeTimeGetsAdvancedAfterPreviousNotificationChainMembesAreFinished() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();
+        WatchListener listener = mock(WatchListener.class);
+        when(listener.onUpdate(any())).thenReturn(listenerFuture);
+
+        watchProcessor.addWatch(new Watch(0, listener, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));
+
+        var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, TIMESTAMP);
+        HybridTimestamp entryTs = new HybridTimestamp(1, 2);

Review Comment:
   U can use var.



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTracker.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * Allows to work with schema safe time. This safe time stems from Metastorage 
safe time (as schemas are delivered using Metastorage),
+ * but it might grow faster as it only concerns schema-related (currently, it 
means Catalog-related) Metastorage updates.
+ *
+ * <p>If latest schema update happened in revision N and all listeners' 
notifications for that revision (including Catalog listeners)

Review Comment:
   I didn't quite understand the phrase, maybe you can rephrase it?



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java:
##########
@@ -288,6 +289,48 @@ void nodeStoppingExceptionDoesNotTriggerFailureManager() {
         verify(failureManager, never()).process(any());
     }
 
+    @Test
+    void 
eventNotificationUpdatesMetastoreSafeTimeAfterNotifyingWatchListeners() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();
+        WatchListener listener = mock(WatchListener.class);
+        when(listener.onUpdate(any())).thenReturn(listenerFuture);
+
+        watchProcessor.addWatch(new Watch(0, listener, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));
+
+        var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, TIMESTAMP);
+        HybridTimestamp ts = new HybridTimestamp(1, 2);
+
+        watchProcessor.notifyWatches(1, List.of(entry1), ts);
+
+        verify(watchEventHandlingCallback, 
timeout(100).times(0)).onSafeTimeAdvanced(any());
+
+        listenerFuture.complete(null);
+
+        verify(watchEventHandlingCallback, 
timeout(SECONDS.toMillis(10))).onSafeTimeAdvanced(ts);
+    }
+
+    @Test
+    void 
metastoreSafeTimeGetsAdvancedAfterPreviousNotificationChainMembesAreFinished() {
+        CompletableFuture<Void> listenerFuture = new CompletableFuture<>();
+        WatchListener listener = mock(WatchListener.class);
+        when(listener.onUpdate(any())).thenReturn(listenerFuture);
+
+        watchProcessor.addWatch(new Watch(0, listener, key -> 
Arrays.equals(key, "foo".getBytes(UTF_8))));
+
+        var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, TIMESTAMP);
+        HybridTimestamp entryTs = new HybridTimestamp(1, 2);
+        HybridTimestamp laterTs = entryTs.addPhysicalTime(10);
+
+        watchProcessor.notifyWatches(1, List.of(entry1), entryTs);
+        watchProcessor.advanceSafeTime(() -> {}, laterTs);
+
+        verify(watchEventHandlingCallback, 
timeout(100).times(0)).onSafeTimeAdvanced(laterTs);

Review Comment:
   Save about timeout.



##########
modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import 
org.apache.ignite.internal.metastorage.server.NotificationEnqueuedListener;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Default implementation of {@link SchemaSafeTimeTracker}.
+ */
+public class SchemaSafeTimeTrackerImpl implements SchemaSafeTimeTracker, 
IgniteComponent, NotificationEnqueuedListener {
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> 
schemaSafeTime =
+            new PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE);
+
+    private CompletableFuture<Void> schemaSafeTimeUpdateFuture = 
nullCompletedFuture();
+
+    private final Object futureMutex = new Object();

Review Comment:
   Maybe we don't need a blocking algorithm?



##########
modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.junit.jupiter.api.Test;
+
+class SchemaSafeTimeTrackerImplTest {
+    @Test
+    void closureCompletesFuturesWithNodeStoppingException() {
+        SchemaSafeTimeTrackerImpl tracker = new SchemaSafeTimeTrackerImpl();

Review Comment:
   U can use var.



##########
modules/network-api/src/main/java/org/apache/ignite/internal/network/MessagingService.java:
##########
@@ -34,7 +34,7 @@ public interface MessagingService {
      * Send the given message asynchronously to the specific member without 
any delivery guarantees.
      *
      * @param recipient Recipient of the message.
-     * @param msg       Message which should be delivered.
+     * @param msg Message which should be delivered.

Review Comment:
   I would suggest not to make changes in this class, so that later you will 
have to analyze less who changed and why the lines of code, at your discretion.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncMetastorageDependencyTest.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.apache.ignite.table.QualifiedName.DEFAULT_SCHEMA_NAME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ItSchemaSyncMetastorageDependencyTest extends 
ClusterPerTestIntegrationTest {
+    private static final String TABLE_NAME = "TEST_TABLE";
+
+    private IgniteImpl ignite;
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @BeforeEach
+    void prepare() {
+        ignite = igniteImpl(0);
+
+        ignite.sql().executeScript("CREATE TABLE "
+                + TABLE_NAME
+                + " (ID INT PRIMARY KEY, VAL VARCHAR)");
+
+        // Do a put to make sure that replicas are started before we start 
blocking the metastorage progress.
+        assertThat(doPutAsync(ignite), willCompleteSuccessfully());
+    }
+
+    @Test
+    void longWatchHandlingForNonCatalogEntryDoesNotAffectSchemaSync() throws 
Exception {
+        ByteArray entryKey = new 
ByteArray(ItSchemaSyncMetastorageDependencyTest.class.getName());
+        ignite.metaStorageManager().registerExactWatch(entryKey, event -> 
neverCompletingFuture());
+
+        assertThat(ignite.metaStorageManager().put(entryKey, new byte[0]), 
willCompleteSuccessfully());
+
+        waitOutDelayDuration();
+
+        assertThat(doPutAsync(ignite), willSucceedIn(10, SECONDS));

Review Comment:
   I would suggest using `willCompleteSuccessfully`.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java:
##########
@@ -52,22 +52,22 @@ class SchemaSyncServiceImplTest extends 
BaseIgniteAbstractTest {
 
     @BeforeEach
     void createSchemaSyncService() {
-        schemaSyncService = new SchemaSyncServiceImpl(clusterTime, 
delayDurationMs);
+        schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, 
delayDurationMs);
     }
 
     @Test
-    void waitsTillSchemaCompletenessSubtractingDelayDuration() {
+    void waitsOnSchemaSafeTimeTillSchemaCompletenessSubtractingDelayDuration() 
{
         HybridTimestamp ts = clock.now();
-        CompletableFuture<Void> clusterTimeFuture = new CompletableFuture<>();
+        CompletableFuture<Void> safeTimeFuture = new CompletableFuture<>();

Review Comment:
   U can use var.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to