denis-chudov commented on code in PR #6192: URL: https://github.com/apache/ignite-3/pull/6192#discussion_r2192320647
########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java: ########## @@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() { }); } - private static Set<Assignment> getStableAssignments(Ignite node, String tableName) { + @Test + void testLearnerReplicaReadOnlyRequestCanReadData() { Review Comment: This test is flaky, please fix it ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java: ########## @@ -153,6 +259,24 @@ private static Function<Ignite, RaftGroupService> toRaftClient(String tableName) }; } + private static Function<Ignite, ReplicaListener> toReplicaListener(String zoneName, String tableName, int partId) { Review Comment: ```suggestion private static Function<Ignite, ReplicaListener> getReplicaListener(String zoneName, String tableName, int partId) { ``` ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java: ########## @@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() { }); } - private static Set<Assignment> getStableAssignments(Ignite node, String tableName) { + @Test + void testLearnerReplicaReadOnlyRequestCanReadData() { + executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM SIZE 2) STORAGE PROFILES ['default']"); + executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE TEST_ZONE"); + executeSql("INSERT INTO TEST VALUES (0, 0)"); + + cluster.startNode((int) cluster.runningNodes().count()); // starting a new node adds a learner + AtomicReference<IgniteImpl> learnerRef = new AtomicReference<>(); + + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + Set<Assignment> stableAssignments = stablePartitionAssignments(cluster.node(0), "TEST"); + String learner = stableAssignments.stream().filter(a -> !a.isPeer()).map(Assignment::consistentId).findFirst().orElse(null); + assertTrue(learner != null && !learner.isBlank(), "learners found after staring new node"); + learnerRef.set(findNode(node -> node.name().equals(learner))); + }); + + IgniteImpl learner = learnerRef.get(); + ReplicaListener replicaListener = of(learnerRef.get()).map(toReplicaListener("TEST_ZONE", "TEST", 0)).orElseThrow(); + ClusterNode learnerClusterNode = learner.clusterService().topologyService().localMember(); + SchemaDescriptor schema = unwrapTableViewInternal(learner.tables().table("TEST")).schemaView().lastKnownSchema(); + KvMarshaller<Integer, Integer> marshaller = new ReflectionMarshallerFactory().create(schema, Integer.class, Integer.class); + Row pk = marshaller.marshal(0); + + CompletableFuture<ReplicaResult> singleRowResult = replicaListener.invoke( + readOnlySingleRowPkReplicaRequest(learner, "TEST_ZONE", "TEST", 0, pk), + learnerClusterNode.id() + ); + assertThat(singleRowResult, willCompleteSuccessfully()); + Row singleRow = Row.wrapBinaryRow(schema, (BinaryRow) singleRowResult.join().result()); + assertThat(marshaller.unmarshalKey(singleRow), equalTo(0)); + assertThat(marshaller.unmarshalValue(singleRow), equalTo(0)); + + CompletableFuture<ReplicaResult> multiRowResult = replicaListener.invoke( + readOnlyMultiRowPkReplicaRequest(learner, "TEST_ZONE", "TEST", 0, pk), + learnerClusterNode.id() + ); + assertThat(multiRowResult, willCompleteSuccessfully()); + Row multiRow = Row.wrapBinaryRow(schema, ((List<BinaryRow>) multiRowResult.join().result()).get(0)); + assertThat(marshaller.unmarshalKey(multiRow), equalTo(0)); + assertThat(marshaller.unmarshalValue(multiRow), equalTo(0)); + + CompletableFuture<ReplicaResult> scanResult = replicaListener.invoke( + readOnlyScanRetrieveBatchReplicaRequest(learner, "TEST_ZONE", "TEST", 0), + learnerClusterNode.id() + ); + assertThat(scanResult, willCompleteSuccessfully()); + Row scanRow = Row.wrapBinaryRow(schema, ((List<BinaryRow>) scanResult.join().result()).get(0)); + assertThat(marshaller.unmarshalKey(scanRow), equalTo(0)); + assertThat(marshaller.unmarshalValue(scanRow), equalTo(0)); + } + + private IgniteImpl findNode(Predicate<? super Ignite> predicate) { Review Comment: You can move this to ClusterPerTestIntegrationTest ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java: ########## @@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() { }); } - private static Set<Assignment> getStableAssignments(Ignite node, String tableName) { + @Test + void testLearnerReplicaReadOnlyRequestCanReadData() { + executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM SIZE 2) STORAGE PROFILES ['default']"); + executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE TEST_ZONE"); + executeSql("INSERT INTO TEST VALUES (0, 0)"); + + cluster.startNode((int) cluster.runningNodes().count()); // starting a new node adds a learner + AtomicReference<IgniteImpl> learnerRef = new AtomicReference<>(); + + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + Set<Assignment> stableAssignments = stablePartitionAssignments(cluster.node(0), "TEST"); + String learner = stableAssignments.stream().filter(a -> !a.isPeer()).map(Assignment::consistentId).findFirst().orElse(null); + assertTrue(learner != null && !learner.isBlank(), "learners found after staring new node"); + learnerRef.set(findNode(node -> node.name().equals(learner))); + }); + + IgniteImpl learner = learnerRef.get(); + ReplicaListener replicaListener = of(learnerRef.get()).map(toReplicaListener("TEST_ZONE", "TEST", 0)).orElseThrow(); Review Comment: It seems to me, that this line may be also flaky sometimes. There is no linearization between the appearance of the replica on learner and this line ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java: ########## @@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() { }); } - private static Set<Assignment> getStableAssignments(Ignite node, String tableName) { + @Test + void testLearnerReplicaReadOnlyRequestCanReadData() { + executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM SIZE 2) STORAGE PROFILES ['default']"); + executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE TEST_ZONE"); + executeSql("INSERT INTO TEST VALUES (0, 0)"); + + cluster.startNode((int) cluster.runningNodes().count()); // starting a new node adds a learner + AtomicReference<IgniteImpl> learnerRef = new AtomicReference<>(); + + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + Set<Assignment> stableAssignments = stablePartitionAssignments(cluster.node(0), "TEST"); + String learner = stableAssignments.stream().filter(a -> !a.isPeer()).map(Assignment::consistentId).findFirst().orElse(null); + assertTrue(learner != null && !learner.isBlank(), "learners found after staring new node"); + learnerRef.set(findNode(node -> node.name().equals(learner))); + }); + + IgniteImpl learner = learnerRef.get(); + ReplicaListener replicaListener = of(learnerRef.get()).map(toReplicaListener("TEST_ZONE", "TEST", 0)).orElseThrow(); + ClusterNode learnerClusterNode = learner.clusterService().topologyService().localMember(); + SchemaDescriptor schema = unwrapTableViewInternal(learner.tables().table("TEST")).schemaView().lastKnownSchema(); + KvMarshaller<Integer, Integer> marshaller = new ReflectionMarshallerFactory().create(schema, Integer.class, Integer.class); + Row pk = marshaller.marshal(0); Review Comment: I would recommend to make a separate serialization method (or method returning marshaller) ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java: ########## @@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() { }); } - private static Set<Assignment> getStableAssignments(Ignite node, String tableName) { + @Test + void testLearnerReplicaReadOnlyRequestCanReadData() { + executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM SIZE 2) STORAGE PROFILES ['default']"); + executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE TEST_ZONE"); + executeSql("INSERT INTO TEST VALUES (0, 0)"); Review Comment: Let's use non-zero values here ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java: ########## @@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() { }); } - private static Set<Assignment> getStableAssignments(Ignite node, String tableName) { + @Test + void testLearnerReplicaReadOnlyRequestCanReadData() { + executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM SIZE 2) STORAGE PROFILES ['default']"); + executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE TEST_ZONE"); + executeSql("INSERT INTO TEST VALUES (0, 0)"); + + cluster.startNode((int) cluster.runningNodes().count()); // starting a new node adds a learner Review Comment: You can override `ClusterPerTestIntegrationTest#initialNodes` with value `4` ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java: ########## @@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() { }); } - private static Set<Assignment> getStableAssignments(Ignite node, String tableName) { + @Test + void testLearnerReplicaReadOnlyRequestCanReadData() { + executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM SIZE 2) STORAGE PROFILES ['default']"); + executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE TEST_ZONE"); + executeSql("INSERT INTO TEST VALUES (0, 0)"); + + cluster.startNode((int) cluster.runningNodes().count()); // starting a new node adds a learner + AtomicReference<IgniteImpl> learnerRef = new AtomicReference<>(); + + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + Set<Assignment> stableAssignments = stablePartitionAssignments(cluster.node(0), "TEST"); + String learner = stableAssignments.stream().filter(a -> !a.isPeer()).map(Assignment::consistentId).findFirst().orElse(null); + assertTrue(learner != null && !learner.isBlank(), "learners found after staring new node"); + learnerRef.set(findNode(node -> node.name().equals(learner))); + }); + + IgniteImpl learner = learnerRef.get(); + ReplicaListener replicaListener = of(learnerRef.get()).map(toReplicaListener("TEST_ZONE", "TEST", 0)).orElseThrow(); + ClusterNode learnerClusterNode = learner.clusterService().topologyService().localMember(); + SchemaDescriptor schema = unwrapTableViewInternal(learner.tables().table("TEST")).schemaView().lastKnownSchema(); + KvMarshaller<Integer, Integer> marshaller = new ReflectionMarshallerFactory().create(schema, Integer.class, Integer.class); + Row pk = marshaller.marshal(0); + + CompletableFuture<ReplicaResult> singleRowResult = replicaListener.invoke( + readOnlySingleRowPkReplicaRequest(learner, "TEST_ZONE", "TEST", 0, pk), + learnerClusterNode.id() + ); + assertThat(singleRowResult, willCompleteSuccessfully()); + Row singleRow = Row.wrapBinaryRow(schema, (BinaryRow) singleRowResult.join().result()); + assertThat(marshaller.unmarshalKey(singleRow), equalTo(0)); + assertThat(marshaller.unmarshalValue(singleRow), equalTo(0)); Review Comment: There is also no HP before the replication of the value to the learner and these checks, may become flaky as well. Could you please make the first check that the value _eventually_ appears? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org