denis-chudov commented on code in PR #4755: URL: https://github.com/apache/ignite-3/pull/4755#discussion_r1852392262
########## modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java: ########## @@ -283,29 +290,101 @@ public void testNetworkExceptionOnNegotiation() throws InterruptedException { assertLeaseCorrect(CLUSTER_NODE_0.id()); } + @Test + public void testAgreementCannotBeOverriddenWhileValid() throws InterruptedException { + ConditionalWatchInhibitor watchInhibitor = new ConditionalWatchInhibitor(metaStorageManager); + watchInhibitor.startInhibit(rev -> stableAssignmentsExist(GROUP_ID)); + + CompletableFuture<Void> lgmResponseFuture = new CompletableFuture<>(); + AtomicInteger invokeFailCounter = new AtomicInteger(); + AtomicInteger lgmCounter = new AtomicInteger(); + + metaStorageManager.setAfterInvokeInterceptor(res -> { + if (!res) { + invokeFailCounter.incrementAndGet(); + } + }); + + AtomicLong negotiatedLeaseStartTime = new AtomicLong(); + + leaseGrantedMessageHandler = (n, lgm) -> { + lgmCounter.incrementAndGet(); + + log.info("Lease granted message received [node={}, leaseStartTime={}, cntr={}]", n, lgm.leaseStartTime().longValue(), + lgmCounter.get()); + + if (negotiatedLeaseStartTime.get() == 0) { + negotiatedLeaseStartTime.set(lgm.leaseStartTime().longValue()); + } + + return lgmResponseFuture.thenApply(unused -> createLeaseGrantedMessageResponse(true)); + }; + + metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID), Assignments.toBytes(Set.of(forPeer(NODE_0_NAME)), assignmentsTimestamp)); + + assertTrue(waitForCondition(() -> lgmCounter.get() == 1, 3_000)); + + // Wait for a couple of iterations of LeaseUpdater. + Thread.sleep(1_000); + + lgmResponseFuture.complete(null); + watchInhibitor.stopInhibit(); + + waitForAcceptedLease(); + + Lease lease = getLeaseFromMs(); + + assertEquals(negotiatedLeaseStartTime.get(), lease.getStartTime().longValue()); + } + + private boolean stableAssignmentsExist(TablePartitionId groupId) { + CompletableFuture<Entry> f = metaStorageManager.get(stablePartAssignmentsKey(groupId)); + + assertThat(f, willSucceedFast()); + + Entry e = f.join(); + + return !e.empty() && !e.tombstone(); + } + + @Nullable private Lease getLeaseFromMs() { CompletableFuture<Entry> f = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY); assertThat(f, willSucceedFast()); Entry e = f.join(); + if (e.empty() || e.tombstone()) { + return null; + } + LeaseBatch leases = LeaseBatch.fromBytes(ByteBuffer.wrap(e.value()).order(ByteOrder.LITTLE_ENDIAN)); - return leases.leases().stream().findFirst().orElseThrow(); + return leases.leases().stream().findFirst().orElse(null); + } + + private void waitForLease() throws InterruptedException { + assertTrue(waitForCondition(() -> { + Lease lease = getLeaseFromMs(); + + return lease != null; + }, 10_000)); } private void waitForAcceptedLease() throws InterruptedException { assertTrue(waitForCondition(() -> { Lease lease = getLeaseFromMs(); + log.info("qqq lease=" + lease); Review Comment: wow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org