ibessonov commented on code in PR #2473:
URL: https://github.com/apache/ignite-3/pull/2473#discussion_r1312696492
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -792,6 +816,116 @@ void
testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Ex
assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
}
+ /**
+ * The replication mechanism must not replicate commands for which schemas
are not yet available on the node
+ * to which replication happens (in Raft, it means that followers/learners
cannot receive commands that they
+ * cannot execute without waiting for schemas). This method tests that
snapshots bringing such commands are
+ * rejected, and that, when metadata catches up, the snapshot gets
successfully installed.
+ */
+ @Test
+ void laggingSchemasOnFollowerPreventSnapshotInstallation() throws
Exception {
+ cluster.startAndInit(3);
+
+ createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+ // Prepare the scene: force node 0 to be a leader, and node 2 to be a
follower.
+
+ final int leaderIndex = 0;
+ final int followerIndex = 2;
+
+ transferLeadershipOnSolePartitionTo(leaderIndex);
+ cluster.transferLeadershipTo(leaderIndex, MetastorageGroupId.INSTANCE);
+
+ // Block AppendEntries from being accepted on the follower so that the
leader will have to use a snapshot.
+ blockIncomingAppendEntriesAt(followerIndex);
+
+ // Inhibit the MetaStorage on the follower to make snapshots not
eligible for installation.
+ WatchListenerInhibitor listenerInhibitor =
inhibitMetastorageListenersAt(followerIndex);
+
+ try {
+ // Add some data in a schema that is not yet available on the
follower
+ updateTableSchemaAt(leaderIndex);
+ putToTableAt(leaderIndex);
+
+ CountDownLatch installationRejected = installationRejectedLatch();
+ CountDownLatch snapshotInstalled =
snapshotInstalledLatch(followerIndex);
+
+ // Force InstallSnapshot to be used.
+ causeLogTruncationOnSolePartitionLeader(leaderIndex);
+
+ assertTrue(installationRejected.await(20, TimeUnit.SECONDS), "Did
not see snapshot installation rejection");
+
+ assertThat("Snapshot was installed before unblocking",
snapshotInstalled.getCount(), is(not(0L)));
+
+ listenerInhibitor.stopInhibit();
+
+ assertTrue(snapshotInstalled.await(20, TimeUnit.SECONDS), "Did not
see a snapshot installed");
+ } finally {
+ listenerInhibitor.stopInhibit();
+ }
+ }
+
+ private void updateTableSchemaAt(int nodeIndex) {
+ cluster.doInSession(nodeIndex, session -> {
+ session.execute(null, "alter table test add column added int");
+ });
+ }
+
+ private void putToTableAt(int nodeIndex) {
+ KeyValueView<Tuple, Tuple> kvView = cluster.node(nodeIndex)
+ .tables()
+ .table("test")
+ .keyValueView();
+ kvView.put(null, Tuple.create().set("key", 1),
Tuple.create().set("val", "one"));
+ }
+
+ private void blockIncomingAppendEntriesAt(int nodeIndex) {
+ BlockingAppendEntriesRequestProcessor blockingProcessorOnFollower =
installBlockingAppendEntriesProcessor(nodeIndex);
+
+ blockingProcessorOnFollower.startBlocking();
+ }
+
+ private WatchListenerInhibitor inhibitMetastorageListenersAt(int
nodeIndex) {
+ IgniteImpl nodeToInhibitMetaStorage = cluster.node(nodeIndex);
+
+ WatchListenerInhibitor listenerInhibitor =
WatchListenerInhibitor.metastorageEventsInhibitor(nodeToInhibitMetaStorage);
+ listenerInhibitor.startInhibit();
+
+ return listenerInhibitor;
+ }
+
+ private CountDownLatch installationRejectedLatch() {
+ CountDownLatch installationRejected = new CountDownLatch(1);
+
+ copierLogInspector.addHandler(
+ event ->
event.getMessage().getFormattedMessage().startsWith("Metadata not yet
available, URI '"),
+ installationRejected::countDown
+ );
+
+ return installationRejected;
+ }
+
+ private BlockingAppendEntriesRequestProcessor
installBlockingAppendEntriesProcessor(int nodeIndex) {
Review Comment:
Can we have some unit tests maybe? We're not even trying to write them for
some reason
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]