ibessonov commented on code in PR #2473:
URL: https://github.com/apache/ignite-3/pull/2473#discussion_r1309819022
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -436,21 +457,23 @@ private void reanimateNode2AndWaitForSnapshotInstalled()
throws InterruptedExcep
* on it for the sole table partition in the cluster.
*/
private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex)
throws InterruptedException {
+ CountDownLatch snapshotInstalledLatch =
snapshotInstalledLatch(nodeIndex);
+
+ reanimateNode(nodeIndex);
+
+ assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did
not install a snapshot in time");
+ }
+
+ private CountDownLatch snapshotInstalledLatch(int nodeIndex) {
CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
- Handler handler = replicatorLogInspector.addHandler(
+ replicatorLogInspector.addHandler(
evt -> evt.getMessage().getFormattedMessage().matches(
"Node .+ received InstallSnapshotResponse from .+_" +
nodeIndex + " .+ success=true"),
Review Comment:
I'd rather see `\w+` for names, dots scare me, I keep forgetting how they
work.
Do they include spaces? If they do, how much time will the parser spend on
backtracking in case of non-matching strings? These are my thoughts...
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -792,6 +816,116 @@ void
testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Ex
assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
}
+ /**
+ * The replication mechanism must not replicate commands for which schemas
are not yet available on the node
+ * to which replication happens (in Raft, it means that followers/learners
cannot receive commands that they
+ * cannot execute without waiting for schemas). This method tests that
snapshots bringing such commands are
+ * rejected, and that, when metadata catches up, the snapshot gets
successfully installed.
+ */
+ @Test
+ void laggingSchemasOnFollowerPreventSnapshotInstallation() throws
Exception {
+ cluster.startAndInit(3);
+
+ createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+ // Prepare the scene: force node 0 to be a leader, and node 2 to be a
follower.
+
Review Comment:
Any particular reason for this empty line?
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -792,6 +816,116 @@ void
testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Ex
assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
}
+ /**
+ * The replication mechanism must not replicate commands for which schemas
are not yet available on the node
+ * to which replication happens (in Raft, it means that followers/learners
cannot receive commands that they
+ * cannot execute without waiting for schemas). This method tests that
snapshots bringing such commands are
+ * rejected, and that, when metadata catches up, the snapshot gets
successfully installed.
+ */
+ @Test
+ void laggingSchemasOnFollowerPreventSnapshotInstallation() throws
Exception {
+ cluster.startAndInit(3);
+
+ createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+ // Prepare the scene: force node 0 to be a leader, and node 2 to be a
follower.
+
+ final int leaderIndex = 0;
+ final int followerIndex = 2;
+
+ transferLeadershipOnSolePartitionTo(leaderIndex);
+ cluster.transferLeadershipTo(leaderIndex, MetastorageGroupId.INSTANCE);
+
+ // Block AppendEntries from being accepted on the follower so that the
leader will have to use a snapshot.
+ blockIncomingAppendEntriesAt(followerIndex);
Review Comment:
Can the same effect be achieved by changing the number of replicas in zone,
or a node filter (after log truncation)?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -215,6 +249,28 @@ private CompletableFuture<?> loadSnapshotMeta(ClusterNode
snapshotSender) {
}
}
+ private boolean metadataIsSufficientlyComplete() {
+ return
partitionSnapshotStorage.catalogService().latestCatalogVersion() >=
snapshotMeta.requiredCatalogVersion();
+ }
+
+ private void logMetadataIsInsufficiencyAndSetError() {
+ LOG.warn(
+ "Metadata not yet available, URI '{}', required level {};
rejecting snapshot installation.",
+ this.snapshotUri,
+ snapshotMeta.requiredCatalogVersion()
+ );
+
+ String errorMessage = String.format(
+ "Metadata not yet available, URI '%s', required level %s;
rejecting snapshot installation.",
+ this.snapshotUri,
+ snapshotMeta.requiredCatalogVersion()
+ );
+
+ if (isOk()) {
Review Comment:
Again, I don't quite get it, why do we print warnings/errors if everything's
OK. Please comment this code, or fix it if it's wrong
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -142,7 +176,7 @@ public void join() throws InterruptedException {
LOG.error("Error when completing the copier", cause);
- if (!isOk()) {
+ if (isOk()) {
Review Comment:
What?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java:
##########
@@ -90,6 +94,7 @@ public PartitionSnapshotStorage(
String snapshotUri,
RaftOptions raftOptions,
PartitionAccess partition,
+ CatalogService catalogService,
Review Comment:
Why do we have a `CatalogAccess` abstraction, if we still use
`CatalogService` instead? I don't really get it
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -68,6 +71,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
private static final int MAX_TX_DATA_BATCH_SIZE = 1000;
+ /** Number of milliseconds that the follower is allowed to try to catch up
the required catalog version. */
+ private static final int WAIT_FOR_METADATA_CATCHUP_MS = 3000;
Review Comment:
Why don't we create proper configuration? What's up with all these constants?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -215,6 +249,28 @@ private CompletableFuture<?> loadSnapshotMeta(ClusterNode
snapshotSender) {
}
}
+ private boolean metadataIsSufficientlyComplete() {
+ return
partitionSnapshotStorage.catalogService().latestCatalogVersion() >=
snapshotMeta.requiredCatalogVersion();
+ }
+
+ private void logMetadataIsInsufficiencyAndSetError() {
+ LOG.warn(
+ "Metadata not yet available, URI '{}', required level {};
rejecting snapshot installation.",
Review Comment:
Is this the way we format logs? I thought it should be more like "Foo [a=1,
b=2]", there a guidelines somewhere
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -792,6 +816,116 @@ void
testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Ex
assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
}
+ /**
+ * The replication mechanism must not replicate commands for which schemas
are not yet available on the node
+ * to which replication happens (in Raft, it means that followers/learners
cannot receive commands that they
+ * cannot execute without waiting for schemas). This method tests that
snapshots bringing such commands are
+ * rejected, and that, when metadata catches up, the snapshot gets
successfully installed.
+ */
+ @Test
+ void laggingSchemasOnFollowerPreventSnapshotInstallation() throws
Exception {
+ cluster.startAndInit(3);
+
+ createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+ // Prepare the scene: force node 0 to be a leader, and node 2 to be a
follower.
+
+ final int leaderIndex = 0;
+ final int followerIndex = 2;
+
+ transferLeadershipOnSolePartitionTo(leaderIndex);
+ cluster.transferLeadershipTo(leaderIndex, MetastorageGroupId.INSTANCE);
+
+ // Block AppendEntries from being accepted on the follower so that the
leader will have to use a snapshot.
+ blockIncomingAppendEntriesAt(followerIndex);
+
+ // Inhibit the MetaStorage on the follower to make snapshots not
eligible for installation.
+ WatchListenerInhibitor listenerInhibitor =
inhibitMetastorageListenersAt(followerIndex);
+
+ try {
+ // Add some data in a schema that is not yet available on the
follower
+ updateTableSchemaAt(leaderIndex);
+ putToTableAt(leaderIndex);
+
+ CountDownLatch installationRejected = installationRejectedLatch();
+ CountDownLatch snapshotInstalled =
snapshotInstalledLatch(followerIndex);
+
+ // Force InstallSnapshot to be used.
+ causeLogTruncationOnSolePartitionLeader(leaderIndex);
+
+ assertTrue(installationRejected.await(20, TimeUnit.SECONDS), "Did
not see snapshot installation rejection");
+
+ assertThat("Snapshot was installed before unblocking",
snapshotInstalled.getCount(), is(not(0L)));
+
+ listenerInhibitor.stopInhibit();
+
+ assertTrue(snapshotInstalled.await(20, TimeUnit.SECONDS), "Did not
see a snapshot installed");
+ } finally {
+ listenerInhibitor.stopInhibit();
+ }
+ }
+
+ private void updateTableSchemaAt(int nodeIndex) {
+ cluster.doInSession(nodeIndex, session -> {
+ session.execute(null, "alter table test add column added int");
+ });
+ }
+
+ private void putToTableAt(int nodeIndex) {
+ KeyValueView<Tuple, Tuple> kvView = cluster.node(nodeIndex)
+ .tables()
+ .table("test")
+ .keyValueView();
+ kvView.put(null, Tuple.create().set("key", 1),
Tuple.create().set("val", "one"));
+ }
+
+ private void blockIncomingAppendEntriesAt(int nodeIndex) {
+ BlockingAppendEntriesRequestProcessor blockingProcessorOnFollower =
installBlockingAppendEntriesProcessor(nodeIndex);
+
+ blockingProcessorOnFollower.startBlocking();
+ }
+
+ private WatchListenerInhibitor inhibitMetastorageListenersAt(int
nodeIndex) {
+ IgniteImpl nodeToInhibitMetaStorage = cluster.node(nodeIndex);
+
+ WatchListenerInhibitor listenerInhibitor =
WatchListenerInhibitor.metastorageEventsInhibitor(nodeToInhibitMetaStorage);
+ listenerInhibitor.startInhibit();
+
+ return listenerInhibitor;
+ }
+
+ private CountDownLatch installationRejectedLatch() {
+ CountDownLatch installationRejected = new CountDownLatch(1);
+
+ copierLogInspector.addHandler(
+ event ->
event.getMessage().getFormattedMessage().startsWith("Metadata not yet
available, URI '"),
+ installationRejected::countDown
+ );
+
+ return installationRejected;
+ }
+
+ private BlockingAppendEntriesRequestProcessor
installBlockingAppendEntriesProcessor(int nodeIndex) {
Review Comment:
These tests become more and more complicated with time, any ideas of
improvements?
Too much reflection for my taste
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -121,13 +127,41 @@ public void start() {
}
return loadSnapshotMeta(snapshotSender)
- .thenCompose(unused1 ->
loadSnapshotMvData(snapshotSender, executor))
- .thenCompose(unused1 ->
loadSnapshotTxData(snapshotSender, executor));
+ // Give metadata some time to catch up as it's
very probable that the leader is ahead metadata-wise.
+ .thenCompose(unused2 ->
waitForMetadataWithTimeout())
+ .thenCompose(unused2 -> {
+ if (metadataIsSufficientlyComplete()) {
+ return loadSnapshotMvData(snapshotSender,
executor)
+ .thenCompose(unused1 ->
loadSnapshotTxData(snapshotSender, executor));
+ } else {
+ logMetadataIsInsufficiencyAndSetError();
Review Comment:
"is insufficiency" - I think you don't need "is"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]