denis-chudov commented on code in PR #4755:
URL: https://github.com/apache/ignite-3/pull/4755#discussion_r1852392262


##########
modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java:
##########
@@ -283,29 +290,101 @@ public void testNetworkExceptionOnNegotiation() throws 
InterruptedException {
         assertLeaseCorrect(CLUSTER_NODE_0.id());
     }
 
+    @Test
+    public void testAgreementCannotBeOverriddenWhileValid() throws 
InterruptedException {
+        ConditionalWatchInhibitor watchInhibitor = new 
ConditionalWatchInhibitor(metaStorageManager);
+        watchInhibitor.startInhibit(rev -> stableAssignmentsExist(GROUP_ID));
+
+        CompletableFuture<Void> lgmResponseFuture = new CompletableFuture<>();
+        AtomicInteger invokeFailCounter = new AtomicInteger();
+        AtomicInteger lgmCounter = new AtomicInteger();
+
+        metaStorageManager.setAfterInvokeInterceptor(res -> {
+            if (!res) {
+                invokeFailCounter.incrementAndGet();
+            }
+        });
+
+        AtomicLong negotiatedLeaseStartTime = new AtomicLong();
+
+        leaseGrantedMessageHandler = (n, lgm) -> {
+            lgmCounter.incrementAndGet();
+
+            log.info("Lease granted message received [node={}, 
leaseStartTime={}, cntr={}]", n, lgm.leaseStartTime().longValue(),
+                    lgmCounter.get());
+
+            if (negotiatedLeaseStartTime.get() == 0) {
+                negotiatedLeaseStartTime.set(lgm.leaseStartTime().longValue());
+            }
+
+            return lgmResponseFuture.thenApply(unused -> 
createLeaseGrantedMessageResponse(true));
+        };
+
+        metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID), 
Assignments.toBytes(Set.of(forPeer(NODE_0_NAME)), assignmentsTimestamp));
+
+        assertTrue(waitForCondition(() -> lgmCounter.get() == 1, 3_000));
+
+        // Wait for a couple of iterations of LeaseUpdater.
+        Thread.sleep(1_000);
+
+        lgmResponseFuture.complete(null);
+        watchInhibitor.stopInhibit();
+
+        waitForAcceptedLease();
+
+        Lease lease = getLeaseFromMs();
+
+        assertEquals(negotiatedLeaseStartTime.get(), 
lease.getStartTime().longValue());
+    }
+
+    private boolean stableAssignmentsExist(TablePartitionId groupId) {
+        CompletableFuture<Entry> f = 
metaStorageManager.get(stablePartAssignmentsKey(groupId));
+
+        assertThat(f, willSucceedFast());
+
+        Entry e = f.join();
+
+        return !e.empty() && !e.tombstone();
+    }
+
+    @Nullable
     private Lease getLeaseFromMs() {
         CompletableFuture<Entry> f = 
metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);
 
         assertThat(f, willSucceedFast());
 
         Entry e = f.join();
 
+        if (e.empty() || e.tombstone()) {
+            return null;
+        }
+
         LeaseBatch leases = 
LeaseBatch.fromBytes(ByteBuffer.wrap(e.value()).order(ByteOrder.LITTLE_ENDIAN));
 
-        return leases.leases().stream().findFirst().orElseThrow();
+        return leases.leases().stream().findFirst().orElse(null);
+    }
+
+    private void waitForLease() throws InterruptedException {
+        assertTrue(waitForCondition(() -> {
+            Lease lease = getLeaseFromMs();
+
+            return lease != null;
+        }, 10_000));
     }
 
     private void waitForAcceptedLease() throws InterruptedException {
         assertTrue(waitForCondition(() -> {
             Lease lease = getLeaseFromMs();
+            log.info("qqq lease=" + lease);

Review Comment:
   wow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to