sashapolo commented on code in PR #5588:
URL: https://github.com/apache/ignite-3/pull/5588#discussion_r2039734198


##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -213,4 +372,50 @@ private static CompletableFuture<Void> truncateLog(Node 
node, ReplicationGroupId
         return node.replicaManager.replica(groupId)
                 .thenCompose(replica -> replica.createSnapshotOn(member, 
true));
     }
+
+    private static @Nullable MvTableStorage tableStorage(Node node, int 
tableId) {
+        TableImpl table = node.tableManager.startedTables().get(tableId);
+
+        return table == null ? null : table.internalTable().storage();
+    }
+
+    private static CompletableFuture<Void> startBlockOutgoingSnapshot(Node 
node, ZonePartitionId partitionId) throws InterruptedException {
+        return startBlockingMessages(node, partitionId, 
InstallSnapshotRequest.class);
+    }
+
+    private static void stopBlockingMessages(Node node, ZonePartitionId 
partitionId) {
+        var raftNodeId = new RaftNodeId(partitionId, new Peer(node.name));
+
+        ((JraftServerImpl) 
node.raftManager.server()).stopBlockMessages(raftNodeId);
+    }
+
+    private static CompletableFuture<Void> startBlockIncomingSnapshot(Node 
node, ZonePartitionId partitionId) throws InterruptedException {
+        return startBlockingMessages(node, partitionId, 
InstallSnapshotResponse.class);
+    }
+
+    private static CompletableFuture<Void> startBlockingMessages(
+            Node node,
+            ZonePartitionId partitionId,
+            Class<? extends Message> messageType
+    ) throws InterruptedException {
+        var raftServer = (JraftServerImpl) node.raftManager.server();
+
+        var raftNodeId = new RaftNodeId(partitionId, new Peer(node.name));
+
+        // Wait for the Raft node to start.
+        assertTrue(waitForCondition(() -> 
raftServer.raftGroupService(raftNodeId) != null, 10_000));
+
+        var conditionReachedFuture = new CompletableFuture<Void>();
+
+        raftServer.blockMessages(raftNodeId, (msg, peerId) -> {
+            if (messageType.isInstance(msg)) {
+                conditionReachedFuture.complete(null);
+
+                return true;
+            }
+            return false;
+        });
+
+        return conditionReachedFuture;
+    }

Review Comment:
   > First test seems to make sure that it destroys the table before the 
snapshot even starts to flow.
   
   Not really, it waits for snapshot to just about to start to flow, and then 
destroys the storage. It's a bit of a probabilistic test, because it will 
destroy the storage "somewhere" during snapshot installation. I wasn't able to 
make it even more precise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to