ibessonov commented on code in PR #2473:
URL: https://github.com/apache/ignite-3/pull/2473#discussion_r1309819022


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -436,21 +457,23 @@ private void reanimateNode2AndWaitForSnapshotInstalled() 
throws InterruptedExcep
      * on it for the sole table partition in the cluster.
      */
     private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex) 
throws InterruptedException {
+        CountDownLatch snapshotInstalledLatch = 
snapshotInstalledLatch(nodeIndex);
+
+        reanimateNode(nodeIndex);
+
+        assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did 
not install a snapshot in time");
+    }
+
+    private CountDownLatch snapshotInstalledLatch(int nodeIndex) {
         CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
 
-        Handler handler = replicatorLogInspector.addHandler(
+        replicatorLogInspector.addHandler(
                 evt -> evt.getMessage().getFormattedMessage().matches(
                         "Node .+ received InstallSnapshotResponse from .+_" + 
nodeIndex + " .+ success=true"),

Review Comment:
   I'd rather see `\w+` for names, dots scare me, I keep forgetting how they 
work.
   Do they include spaces? If they do, how much time will the parser spend on 
backtracking in case of non-matching strings? These are my thoughts...



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -792,6 +816,116 @@ void 
testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Ex
         assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
     }
 
+    /**
+     * The replication mechanism must not replicate commands for which schemas 
are not yet available on the node
+     * to which replication happens (in Raft, it means that followers/learners 
cannot receive commands that they
+     * cannot execute without waiting for schemas). This method tests that 
snapshots bringing such commands are
+     * rejected, and that, when metadata catches up, the snapshot gets 
successfully installed.
+     */
+    @Test
+    void laggingSchemasOnFollowerPreventSnapshotInstallation() throws 
Exception {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+        // Prepare the scene: force node 0 to be a leader, and node 2 to be a 
follower.
+

Review Comment:
   Any particular reason for this empty line?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -792,6 +816,116 @@ void 
testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Ex
         assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
     }
 
+    /**
+     * The replication mechanism must not replicate commands for which schemas 
are not yet available on the node
+     * to which replication happens (in Raft, it means that followers/learners 
cannot receive commands that they
+     * cannot execute without waiting for schemas). This method tests that 
snapshots bringing such commands are
+     * rejected, and that, when metadata catches up, the snapshot gets 
successfully installed.
+     */
+    @Test
+    void laggingSchemasOnFollowerPreventSnapshotInstallation() throws 
Exception {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+        // Prepare the scene: force node 0 to be a leader, and node 2 to be a 
follower.
+
+        final int leaderIndex = 0;
+        final int followerIndex = 2;
+
+        transferLeadershipOnSolePartitionTo(leaderIndex);
+        cluster.transferLeadershipTo(leaderIndex, MetastorageGroupId.INSTANCE);
+
+        // Block AppendEntries from being accepted on the follower so that the 
leader will have to use a snapshot.
+        blockIncomingAppendEntriesAt(followerIndex);

Review Comment:
   Can the same effect be achieved by changing the number of replicas in zone, 
or a node filter (after log truncation)?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -215,6 +249,28 @@ private CompletableFuture<?> loadSnapshotMeta(ClusterNode 
snapshotSender) {
         }
     }
 
+    private boolean metadataIsSufficientlyComplete() {
+        return 
partitionSnapshotStorage.catalogService().latestCatalogVersion() >= 
snapshotMeta.requiredCatalogVersion();
+    }
+
+    private void logMetadataIsInsufficiencyAndSetError() {
+        LOG.warn(
+                "Metadata not yet available, URI '{}', required level {}; 
rejecting snapshot installation.",
+                this.snapshotUri,
+                snapshotMeta.requiredCatalogVersion()
+        );
+
+        String errorMessage = String.format(
+                "Metadata not yet available, URI '%s', required level %s; 
rejecting snapshot installation.",
+                this.snapshotUri,
+                snapshotMeta.requiredCatalogVersion()
+        );
+
+        if (isOk()) {

Review Comment:
   Again, I don't quite get it, why do we print warnings/errors if everything's 
OK. Please comment this code, or fix it if it's wrong



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -142,7 +176,7 @@ public void join() throws InterruptedException {
 
                 LOG.error("Error when completing the copier", cause);
 
-                if (!isOk()) {
+                if (isOk()) {

Review Comment:
   What?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java:
##########
@@ -90,6 +94,7 @@ public PartitionSnapshotStorage(
             String snapshotUri,
             RaftOptions raftOptions,
             PartitionAccess partition,
+            CatalogService catalogService,

Review Comment:
   Why do we have a `CatalogAccess` abstraction, if we still use 
`CatalogService` instead? I don't really get it



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -68,6 +71,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
 
     private static final int MAX_TX_DATA_BATCH_SIZE = 1000;
 
+    /** Number of milliseconds that the follower is allowed to try to catch up 
the required catalog version. */
+    private static final int WAIT_FOR_METADATA_CATCHUP_MS = 3000;

Review Comment:
   Why don't we create proper configuration? What's up with all these constants?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -215,6 +249,28 @@ private CompletableFuture<?> loadSnapshotMeta(ClusterNode 
snapshotSender) {
         }
     }
 
+    private boolean metadataIsSufficientlyComplete() {
+        return 
partitionSnapshotStorage.catalogService().latestCatalogVersion() >= 
snapshotMeta.requiredCatalogVersion();
+    }
+
+    private void logMetadataIsInsufficiencyAndSetError() {
+        LOG.warn(
+                "Metadata not yet available, URI '{}', required level {}; 
rejecting snapshot installation.",

Review Comment:
   Is this the way we format logs? I thought it should be more like "Foo [a=1, 
b=2]", there a guidelines somewhere



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -792,6 +816,116 @@ void 
testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Ex
         assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
     }
 
+    /**
+     * The replication mechanism must not replicate commands for which schemas 
are not yet available on the node
+     * to which replication happens (in Raft, it means that followers/learners 
cannot receive commands that they
+     * cannot execute without waiting for schemas). This method tests that 
snapshots bringing such commands are
+     * rejected, and that, when metadata catches up, the snapshot gets 
successfully installed.
+     */
+    @Test
+    void laggingSchemasOnFollowerPreventSnapshotInstallation() throws 
Exception {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+        // Prepare the scene: force node 0 to be a leader, and node 2 to be a 
follower.
+
+        final int leaderIndex = 0;
+        final int followerIndex = 2;
+
+        transferLeadershipOnSolePartitionTo(leaderIndex);
+        cluster.transferLeadershipTo(leaderIndex, MetastorageGroupId.INSTANCE);
+
+        // Block AppendEntries from being accepted on the follower so that the 
leader will have to use a snapshot.
+        blockIncomingAppendEntriesAt(followerIndex);
+
+        // Inhibit the MetaStorage on the follower to make snapshots not 
eligible for installation.
+        WatchListenerInhibitor listenerInhibitor = 
inhibitMetastorageListenersAt(followerIndex);
+
+        try {
+            // Add some data in a schema that is not yet available on the 
follower
+            updateTableSchemaAt(leaderIndex);
+            putToTableAt(leaderIndex);
+
+            CountDownLatch installationRejected = installationRejectedLatch();
+            CountDownLatch snapshotInstalled = 
snapshotInstalledLatch(followerIndex);
+
+            // Force InstallSnapshot to be used.
+            causeLogTruncationOnSolePartitionLeader(leaderIndex);
+
+            assertTrue(installationRejected.await(20, TimeUnit.SECONDS), "Did 
not see snapshot installation rejection");
+
+            assertThat("Snapshot was installed before unblocking", 
snapshotInstalled.getCount(), is(not(0L)));
+
+            listenerInhibitor.stopInhibit();
+
+            assertTrue(snapshotInstalled.await(20, TimeUnit.SECONDS), "Did not 
see a snapshot installed");
+        } finally {
+            listenerInhibitor.stopInhibit();
+        }
+    }
+
+    private void updateTableSchemaAt(int nodeIndex) {
+        cluster.doInSession(nodeIndex, session -> {
+            session.execute(null, "alter table test add column added int");
+        });
+    }
+
+    private void putToTableAt(int nodeIndex) {
+        KeyValueView<Tuple, Tuple> kvView = cluster.node(nodeIndex)
+                .tables()
+                .table("test")
+                .keyValueView();
+        kvView.put(null, Tuple.create().set("key", 1), 
Tuple.create().set("val", "one"));
+    }
+
+    private void blockIncomingAppendEntriesAt(int nodeIndex) {
+        BlockingAppendEntriesRequestProcessor blockingProcessorOnFollower = 
installBlockingAppendEntriesProcessor(nodeIndex);
+
+        blockingProcessorOnFollower.startBlocking();
+    }
+
+    private WatchListenerInhibitor inhibitMetastorageListenersAt(int 
nodeIndex) {
+        IgniteImpl nodeToInhibitMetaStorage = cluster.node(nodeIndex);
+
+        WatchListenerInhibitor listenerInhibitor = 
WatchListenerInhibitor.metastorageEventsInhibitor(nodeToInhibitMetaStorage);
+        listenerInhibitor.startInhibit();
+
+        return listenerInhibitor;
+    }
+
+    private CountDownLatch installationRejectedLatch() {
+        CountDownLatch installationRejected = new CountDownLatch(1);
+
+        copierLogInspector.addHandler(
+                event -> 
event.getMessage().getFormattedMessage().startsWith("Metadata not yet 
available, URI '"),
+                installationRejected::countDown
+        );
+
+        return installationRejected;
+    }
+
+    private BlockingAppendEntriesRequestProcessor 
installBlockingAppendEntriesProcessor(int nodeIndex) {

Review Comment:
   These tests become more and more complicated with time, any ideas of 
improvements?
   Too much reflection for my taste



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -121,13 +127,41 @@ public void start() {
                     }
 
                     return loadSnapshotMeta(snapshotSender)
-                            .thenCompose(unused1 -> 
loadSnapshotMvData(snapshotSender, executor))
-                            .thenCompose(unused1 -> 
loadSnapshotTxData(snapshotSender, executor));
+                            // Give metadata some time to catch up as it's 
very probable that the leader is ahead metadata-wise.
+                            .thenCompose(unused2 -> 
waitForMetadataWithTimeout())
+                            .thenCompose(unused2 -> {
+                                if (metadataIsSufficientlyComplete()) {
+                                    return loadSnapshotMvData(snapshotSender, 
executor)
+                                            .thenCompose(unused1 -> 
loadSnapshotTxData(snapshotSender, executor));
+                                } else {
+                                    logMetadataIsInsufficiencyAndSetError();

Review Comment:
   "is insufficiency" - I think you don't need "is"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to