rpuch commented on code in PR #2473:
URL: https://github.com/apache/ignite-3/pull/2473#discussion_r1314963833


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -792,6 +816,116 @@ void 
testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Ex
         assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
     }
 
+    /**
+     * The replication mechanism must not replicate commands for which schemas 
are not yet available on the node
+     * to which replication happens (in Raft, it means that followers/learners 
cannot receive commands that they
+     * cannot execute without waiting for schemas). This method tests that 
snapshots bringing such commands are
+     * rejected, and that, when metadata catches up, the snapshot gets 
successfully installed.
+     */
+    @Test
+    void laggingSchemasOnFollowerPreventSnapshotInstallation() throws 
Exception {
+        cluster.startAndInit(3);
+
+        createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+        // Prepare the scene: force node 0 to be a leader, and node 2 to be a 
follower.
+
+        final int leaderIndex = 0;
+        final int followerIndex = 2;
+
+        transferLeadershipOnSolePartitionTo(leaderIndex);
+        cluster.transferLeadershipTo(leaderIndex, MetastorageGroupId.INSTANCE);
+
+        // Block AppendEntries from being accepted on the follower so that the 
leader will have to use a snapshot.
+        blockIncomingAppendEntriesAt(followerIndex);
+
+        // Inhibit the MetaStorage on the follower to make snapshots not 
eligible for installation.
+        WatchListenerInhibitor listenerInhibitor = 
inhibitMetastorageListenersAt(followerIndex);
+
+        try {
+            // Add some data in a schema that is not yet available on the 
follower
+            updateTableSchemaAt(leaderIndex);
+            putToTableAt(leaderIndex);
+
+            CountDownLatch installationRejected = installationRejectedLatch();
+            CountDownLatch snapshotInstalled = 
snapshotInstalledLatch(followerIndex);
+
+            // Force InstallSnapshot to be used.
+            causeLogTruncationOnSolePartitionLeader(leaderIndex);
+
+            assertTrue(installationRejected.await(20, TimeUnit.SECONDS), "Did 
not see snapshot installation rejection");
+
+            assertThat("Snapshot was installed before unblocking", 
snapshotInstalled.getCount(), is(not(0L)));
+
+            listenerInhibitor.stopInhibit();
+
+            assertTrue(snapshotInstalled.await(20, TimeUnit.SECONDS), "Did not 
see a snapshot installed");
+        } finally {
+            listenerInhibitor.stopInhibit();
+        }
+    }
+
+    private void updateTableSchemaAt(int nodeIndex) {
+        cluster.doInSession(nodeIndex, session -> {
+            session.execute(null, "alter table test add column added int");
+        });
+    }
+
+    private void putToTableAt(int nodeIndex) {
+        KeyValueView<Tuple, Tuple> kvView = cluster.node(nodeIndex)
+                .tables()
+                .table("test")
+                .keyValueView();
+        kvView.put(null, Tuple.create().set("key", 1), 
Tuple.create().set("val", "one"));
+    }
+
+    private void blockIncomingAppendEntriesAt(int nodeIndex) {
+        BlockingAppendEntriesRequestProcessor blockingProcessorOnFollower = 
installBlockingAppendEntriesProcessor(nodeIndex);
+
+        blockingProcessorOnFollower.startBlocking();
+    }
+
+    private WatchListenerInhibitor inhibitMetastorageListenersAt(int 
nodeIndex) {
+        IgniteImpl nodeToInhibitMetaStorage = cluster.node(nodeIndex);
+
+        WatchListenerInhibitor listenerInhibitor = 
WatchListenerInhibitor.metastorageEventsInhibitor(nodeToInhibitMetaStorage);
+        listenerInhibitor.startInhibit();
+
+        return listenerInhibitor;
+    }
+
+    private CountDownLatch installationRejectedLatch() {
+        CountDownLatch installationRejected = new CountDownLatch(1);
+
+        copierLogInspector.addHandler(
+                event -> 
event.getMessage().getFormattedMessage().startsWith("Metadata not yet 
available, URI '"),
+                installationRejected::countDown
+        );
+
+        return installationRejected;
+    }
+
+    private BlockingAppendEntriesRequestProcessor 
installBlockingAppendEntriesProcessor(int nodeIndex) {

Review Comment:
   Added a unit test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to