Cyrill commented on code in PR #5193: URL: https://github.com/apache/ignite-3/pull/5193#discussion_r1951347686
########## modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java: ########## @@ -416,4 +439,79 @@ final Set<String> nodeNames(Integer... indexes) { .map(i -> node(i).name()) .collect(Collectors.toUnmodifiableSet()); } + + static List<Throwable> insertValues(Table table, int offset) { + KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView(); + + List<Throwable> errors = new ArrayList<>(); + + for (int i = 0; i < ENTRIES; i++) { + Tuple key = Tuple.create(of("id", i)); + + CompletableFuture<Void> insertFuture = keyValueView.putAsync(null, key, Tuple.create(of("val", i + offset))); + + try { + insertFuture.get(10, SECONDS); + + Tuple value = keyValueView.get(null, key); + assertNotNull(value); + } catch (Throwable e) { + Throwable cause = unwrapCause(e); + + if (cause instanceof IgniteException && isPrimaryReplicaHasChangedException((IgniteException) cause) + || cause instanceof TransactionException + || cause instanceof TimeoutException + ) { + errors.add(cause); + } else { + fail("Unexpected exception", e); + } + } + } + + return errors; + } + + static void assertValuesPresent(Table table) { + IntStream.range(0, ENTRIES).forEach(i -> { + CompletableFuture<Tuple> fut = table.keyValueView().getAsync(null, Tuple.create(of("id", i))); + assertThat(fut, willCompleteSuccessfully()); + + assertNotNull(fut.join()); + }); + } + + void assertValuesPresentOnNodes(IgniteImpl node, Table table, Integer... indexes) { + HybridTimestamp ts = node.clock().now(); + for (Integer index : indexes) { + assertValuesPresentOnNode(table, ts, index); + } + } + + private void assertValuesPresentOnNode(Table table, HybridTimestamp ts, int targetNodeIndex) { + IgniteImpl targetNode = unwrapIgniteImpl(node(targetNodeIndex)); + + TableImpl tableImpl = unwrapTableImpl(table); + InternalTable internalTable = tableImpl.internalTable(); + + for (int i = 0; i < ENTRIES; i++) { + CompletableFuture<BinaryRow> fut = + internalTable.get(marshalKey(tableImpl, Tuple.create(of("id", i))), ts, targetNode.node()); + assertThat(fut, willCompleteSuccessfully()); + + assertNotNull(fut.join()); + } + } + + private static Row marshalKey(TableViewInternal table, Tuple key) { + SchemaRegistry schemaReg = table.schemaView(); + + var marshaller = new TupleMarshallerImpl(schemaReg.lastKnownSchema()); + + return marshaller.marshal(key, null); + } + + private static boolean isPrimaryReplicaHasChangedException(IgniteException cause) { + return cause.getMessage() != null && cause.getMessage().contains("The primary replica has changed"); Review Comment: done ########## modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java: ########## @@ -472,6 +481,36 @@ static void assertValuesPresent(Table table) { }); } + void assertValuesPresentOnNodes(IgniteImpl node, Table table, Integer... indexes) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org