denis-chudov commented on code in PR #6192:
URL: https://github.com/apache/ignite-3/pull/6192#discussion_r2192320647


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java:
##########
@@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() {
         });
     }
 
-    private static Set<Assignment> getStableAssignments(Ignite node, String 
tableName) {
+    @Test
+    void testLearnerReplicaReadOnlyRequestCanReadData() {

Review Comment:
   This test is flaky, please fix it



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java:
##########
@@ -153,6 +259,24 @@ private static Function<Ignite, RaftGroupService> 
toRaftClient(String tableName)
         };
     }
 
+    private static Function<Ignite, ReplicaListener> toReplicaListener(String 
zoneName, String tableName, int partId) {

Review Comment:
   ```suggestion
       private static Function<Ignite, ReplicaListener> 
getReplicaListener(String zoneName, String tableName, int partId) {
   ```



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java:
##########
@@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() {
         });
     }
 
-    private static Set<Assignment> getStableAssignments(Ignite node, String 
tableName) {
+    @Test
+    void testLearnerReplicaReadOnlyRequestCanReadData() {
+        executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM 
SIZE 2) STORAGE PROFILES ['default']");
+        executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE 
TEST_ZONE");
+        executeSql("INSERT INTO TEST VALUES (0, 0)");
+
+        cluster.startNode((int) cluster.runningNodes().count()); // starting a 
new node adds a learner
+        AtomicReference<IgniteImpl> learnerRef = new AtomicReference<>();
+
+        await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+            Set<Assignment> stableAssignments = 
stablePartitionAssignments(cluster.node(0), "TEST");
+            String learner = stableAssignments.stream().filter(a -> 
!a.isPeer()).map(Assignment::consistentId).findFirst().orElse(null);
+            assertTrue(learner != null && !learner.isBlank(), "learners found 
after staring new node");
+            learnerRef.set(findNode(node -> node.name().equals(learner)));
+        });
+
+        IgniteImpl learner = learnerRef.get();
+        ReplicaListener replicaListener = 
of(learnerRef.get()).map(toReplicaListener("TEST_ZONE", "TEST", 
0)).orElseThrow();
+        ClusterNode learnerClusterNode = 
learner.clusterService().topologyService().localMember();
+        SchemaDescriptor schema = 
unwrapTableViewInternal(learner.tables().table("TEST")).schemaView().lastKnownSchema();
+        KvMarshaller<Integer, Integer> marshaller = new 
ReflectionMarshallerFactory().create(schema, Integer.class, Integer.class);
+        Row pk = marshaller.marshal(0);
+
+        CompletableFuture<ReplicaResult> singleRowResult = 
replicaListener.invoke(
+                readOnlySingleRowPkReplicaRequest(learner, "TEST_ZONE", 
"TEST", 0, pk),
+                learnerClusterNode.id()
+        );
+        assertThat(singleRowResult, willCompleteSuccessfully());
+        Row singleRow = Row.wrapBinaryRow(schema, (BinaryRow) 
singleRowResult.join().result());
+        assertThat(marshaller.unmarshalKey(singleRow), equalTo(0));
+        assertThat(marshaller.unmarshalValue(singleRow), equalTo(0));
+
+        CompletableFuture<ReplicaResult> multiRowResult = 
replicaListener.invoke(
+                readOnlyMultiRowPkReplicaRequest(learner, "TEST_ZONE", "TEST", 
0, pk),
+                learnerClusterNode.id()
+        );
+        assertThat(multiRowResult, willCompleteSuccessfully());
+        Row multiRow = Row.wrapBinaryRow(schema, ((List<BinaryRow>) 
multiRowResult.join().result()).get(0));
+        assertThat(marshaller.unmarshalKey(multiRow), equalTo(0));
+        assertThat(marshaller.unmarshalValue(multiRow), equalTo(0));
+
+        CompletableFuture<ReplicaResult> scanResult = replicaListener.invoke(
+                readOnlyScanRetrieveBatchReplicaRequest(learner, "TEST_ZONE", 
"TEST", 0),
+                learnerClusterNode.id()
+        );
+        assertThat(scanResult, willCompleteSuccessfully());
+        Row scanRow = Row.wrapBinaryRow(schema, ((List<BinaryRow>) 
scanResult.join().result()).get(0));
+        assertThat(marshaller.unmarshalKey(scanRow), equalTo(0));
+        assertThat(marshaller.unmarshalValue(scanRow), equalTo(0));
+    }
+
+    private IgniteImpl findNode(Predicate<? super Ignite> predicate) {

Review Comment:
   You can move this to ClusterPerTestIntegrationTest



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java:
##########
@@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() {
         });
     }
 
-    private static Set<Assignment> getStableAssignments(Ignite node, String 
tableName) {
+    @Test
+    void testLearnerReplicaReadOnlyRequestCanReadData() {
+        executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM 
SIZE 2) STORAGE PROFILES ['default']");
+        executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE 
TEST_ZONE");
+        executeSql("INSERT INTO TEST VALUES (0, 0)");
+
+        cluster.startNode((int) cluster.runningNodes().count()); // starting a 
new node adds a learner
+        AtomicReference<IgniteImpl> learnerRef = new AtomicReference<>();
+
+        await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+            Set<Assignment> stableAssignments = 
stablePartitionAssignments(cluster.node(0), "TEST");
+            String learner = stableAssignments.stream().filter(a -> 
!a.isPeer()).map(Assignment::consistentId).findFirst().orElse(null);
+            assertTrue(learner != null && !learner.isBlank(), "learners found 
after staring new node");
+            learnerRef.set(findNode(node -> node.name().equals(learner)));
+        });
+
+        IgniteImpl learner = learnerRef.get();
+        ReplicaListener replicaListener = 
of(learnerRef.get()).map(toReplicaListener("TEST_ZONE", "TEST", 
0)).orElseThrow();

Review Comment:
   It seems to me, that this line may be also flaky sometimes. There is no 
linearization between the appearance of the replica on learner and this line



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java:
##########
@@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() {
         });
     }
 
-    private static Set<Assignment> getStableAssignments(Ignite node, String 
tableName) {
+    @Test
+    void testLearnerReplicaReadOnlyRequestCanReadData() {
+        executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM 
SIZE 2) STORAGE PROFILES ['default']");
+        executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE 
TEST_ZONE");
+        executeSql("INSERT INTO TEST VALUES (0, 0)");
+
+        cluster.startNode((int) cluster.runningNodes().count()); // starting a 
new node adds a learner
+        AtomicReference<IgniteImpl> learnerRef = new AtomicReference<>();
+
+        await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+            Set<Assignment> stableAssignments = 
stablePartitionAssignments(cluster.node(0), "TEST");
+            String learner = stableAssignments.stream().filter(a -> 
!a.isPeer()).map(Assignment::consistentId).findFirst().orElse(null);
+            assertTrue(learner != null && !learner.isBlank(), "learners found 
after staring new node");
+            learnerRef.set(findNode(node -> node.name().equals(learner)));
+        });
+
+        IgniteImpl learner = learnerRef.get();
+        ReplicaListener replicaListener = 
of(learnerRef.get()).map(toReplicaListener("TEST_ZONE", "TEST", 
0)).orElseThrow();
+        ClusterNode learnerClusterNode = 
learner.clusterService().topologyService().localMember();
+        SchemaDescriptor schema = 
unwrapTableViewInternal(learner.tables().table("TEST")).schemaView().lastKnownSchema();
+        KvMarshaller<Integer, Integer> marshaller = new 
ReflectionMarshallerFactory().create(schema, Integer.class, Integer.class);
+        Row pk = marshaller.marshal(0);

Review Comment:
   I would recommend to make a separate serialization method (or method 
returning marshaller)



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java:
##########
@@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() {
         });
     }
 
-    private static Set<Assignment> getStableAssignments(Ignite node, String 
tableName) {
+    @Test
+    void testLearnerReplicaReadOnlyRequestCanReadData() {
+        executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM 
SIZE 2) STORAGE PROFILES ['default']");
+        executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE 
TEST_ZONE");
+        executeSql("INSERT INTO TEST VALUES (0, 0)");

Review Comment:
   Let's use non-zero values here



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java:
##########
@@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() {
         });
     }
 
-    private static Set<Assignment> getStableAssignments(Ignite node, String 
tableName) {
+    @Test
+    void testLearnerReplicaReadOnlyRequestCanReadData() {
+        executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM 
SIZE 2) STORAGE PROFILES ['default']");
+        executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE 
TEST_ZONE");
+        executeSql("INSERT INTO TEST VALUES (0, 0)");
+
+        cluster.startNode((int) cluster.runningNodes().count()); // starting a 
new node adds a learner

Review Comment:
   You can override `ClusterPerTestIntegrationTest#initialNodes` with value `4`



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReplicasTest.java:
##########
@@ -101,7 +139,65 @@ void testLearnerReplicaCreatedAfterStartingNewNode() {
         });
     }
 
-    private static Set<Assignment> getStableAssignments(Ignite node, String 
tableName) {
+    @Test
+    void testLearnerReplicaReadOnlyRequestCanReadData() {
+        executeSql("CREATE ZONE TEST_ZONE (PARTITIONS 1, REPLICAS ALL, QUORUM 
SIZE 2) STORAGE PROFILES ['default']");
+        executeSql("CREATE TABLE TEST (id INT PRIMARY KEY, name INT) ZONE 
TEST_ZONE");
+        executeSql("INSERT INTO TEST VALUES (0, 0)");
+
+        cluster.startNode((int) cluster.runningNodes().count()); // starting a 
new node adds a learner
+        AtomicReference<IgniteImpl> learnerRef = new AtomicReference<>();
+
+        await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+            Set<Assignment> stableAssignments = 
stablePartitionAssignments(cluster.node(0), "TEST");
+            String learner = stableAssignments.stream().filter(a -> 
!a.isPeer()).map(Assignment::consistentId).findFirst().orElse(null);
+            assertTrue(learner != null && !learner.isBlank(), "learners found 
after staring new node");
+            learnerRef.set(findNode(node -> node.name().equals(learner)));
+        });
+
+        IgniteImpl learner = learnerRef.get();
+        ReplicaListener replicaListener = 
of(learnerRef.get()).map(toReplicaListener("TEST_ZONE", "TEST", 
0)).orElseThrow();
+        ClusterNode learnerClusterNode = 
learner.clusterService().topologyService().localMember();
+        SchemaDescriptor schema = 
unwrapTableViewInternal(learner.tables().table("TEST")).schemaView().lastKnownSchema();
+        KvMarshaller<Integer, Integer> marshaller = new 
ReflectionMarshallerFactory().create(schema, Integer.class, Integer.class);
+        Row pk = marshaller.marshal(0);
+
+        CompletableFuture<ReplicaResult> singleRowResult = 
replicaListener.invoke(
+                readOnlySingleRowPkReplicaRequest(learner, "TEST_ZONE", 
"TEST", 0, pk),
+                learnerClusterNode.id()
+        );
+        assertThat(singleRowResult, willCompleteSuccessfully());
+        Row singleRow = Row.wrapBinaryRow(schema, (BinaryRow) 
singleRowResult.join().result());
+        assertThat(marshaller.unmarshalKey(singleRow), equalTo(0));
+        assertThat(marshaller.unmarshalValue(singleRow), equalTo(0));

Review Comment:
   There is also no HP before the replication of the value to the learner and 
these checks, may become flaky as well. Could you please make the first check 
that the value _eventually_ appears?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to