vldpyatkov commented on code in PR #4755: URL: https://github.com/apache/ignite-3/pull/4755#discussion_r1850325679
########## modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java: ########## @@ -537,7 +537,7 @@ private void updateLeaseBatchInternal() { ); } - if (Arrays.equals(leasesCurrent.leasesBytes(), renewedValue)) { + if (renewedLeases.isEmpty() || Arrays.equals(leasesCurrent.leasesBytes(), renewedValue)) { Review Comment: Why cannot we remove all leases? ########## modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/ConditionalWatchInhibitor.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.test; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.MetaStorageManager; + +/** + * Watch inhibitor that starts inhibiting on certain condition. + */ +public class ConditionalWatchInhibitor { + private static final IgniteLogger LOG = Loggers.forClass(ConditionalWatchInhibitor.class); + + private final MetaStorageManager metaStorageManager; + + private CompletableFuture inhibitFuture; + + public ConditionalWatchInhibitor(MetaStorageManager metaStorageManager) { + this.metaStorageManager = metaStorageManager; + } + + /** + * Starts inhibiting on condition. + * + * @param pred Condition. + */ + public void startInhibit(Predicate<Long> pred) { + inhibitFuture = new CompletableFuture<>(); + metaStorageManager.registerRevisionUpdateListener(rev -> { + if (pred.test(rev)) { + LOG.info("Started inhibiting, rev=" + rev); + return inhibitFuture; + } else { + return completedFuture(null); + } + } + ); + } + + /** + * Stop inhibiting. + */ + public void stopInhibit() { + inhibitFuture.complete(null); + } +} Review Comment: Add a break line here. ########## modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java: ########## @@ -201,6 +206,8 @@ public void testAssignmentChangeOnNegotiation() throws InterruptedException { metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID), Assignments.toBytes(Set.of(forPeer(NODE_1_NAME)), assignmentsTimestamp)); + System.out.println("qwe"); Review Comment: Useless commetn. ########## modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java: ########## @@ -124,9 +124,23 @@ public LeaseAgreement getAndRemoveIfReady(ReplicationGroupId groupId) { * * @param groupId Group id. * @param lease Lease to negotiate. + * @return If there is an existing agreement, the lease from agreement is returned, otherwise the new agreement is created and + * {@code null} is returned. */ - public void createAgreement(ReplicationGroupId groupId, Lease lease) { - leaseToNegotiate.put(groupId, new LeaseAgreement(lease)); + @Nullable + public Lease createAgreement(ReplicationGroupId groupId, Lease lease) { Review Comment: Why don't return boolean? ########## modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java: ########## @@ -309,9 +317,41 @@ private void initAndActivateLeaseUpdater() { * @throws InterruptedException if the wait is interrupted. */ private Lease awaitForLease() throws InterruptedException { + return awaitForLease(false); + } + + /** + * Waits for lease write to Meta storage. + * + * @param needAccepted Whether to wait only for accepted lease. + * @return A lease. + * @throws InterruptedException if the wait is interrupted. + */ + private Lease awaitForLease(boolean needAccepted) throws InterruptedException { + return awaitForLease(needAccepted, null); + } + + /** + * Waits for lease write to Meta storage. + * + * @param needAccepted Whether to wait only for accepted lease. + * @param previousLease Previous lease. If not null, then wait for any lease having expiration time other than the previous has (i.e. + * either another lease or prolonged lease). + * @return A lease. + * @throws InterruptedException if the wait is interrupted. + */ + private Lease awaitForLease(boolean needAccepted, @Nullable Lease previousLease) throws InterruptedException { Review Comment: I did not find where `previousLease` is used. ########## modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseNegotiationTest.java: ########## @@ -283,29 +290,101 @@ public void testNetworkExceptionOnNegotiation() throws InterruptedException { assertLeaseCorrect(CLUSTER_NODE_0.id()); } + @Test + public void testAgreementCannotBeOverriddenWhileValid() throws InterruptedException { + ConditionalWatchInhibitor watchInhibitor = new ConditionalWatchInhibitor(metaStorageManager); + watchInhibitor.startInhibit(rev -> stableAssignmentsExist(GROUP_ID)); + + CompletableFuture<Void> lgmResponseFuture = new CompletableFuture<>(); + AtomicInteger invokeFailCounter = new AtomicInteger(); + AtomicInteger lgmCounter = new AtomicInteger(); + + metaStorageManager.setAfterInvokeInterceptor(res -> { + if (!res) { + invokeFailCounter.incrementAndGet(); + } + }); + + AtomicLong negotiatedLeaseStartTime = new AtomicLong(); + + leaseGrantedMessageHandler = (n, lgm) -> { + lgmCounter.incrementAndGet(); + + log.info("Lease granted message received [node={}, leaseStartTime={}, cntr={}]", n, lgm.leaseStartTime().longValue(), + lgmCounter.get()); + + if (negotiatedLeaseStartTime.get() == 0) { + negotiatedLeaseStartTime.set(lgm.leaseStartTime().longValue()); + } + + return lgmResponseFuture.thenApply(unused -> createLeaseGrantedMessageResponse(true)); + }; + + metaStorageManager.put(stablePartAssignmentsKey(GROUP_ID), Assignments.toBytes(Set.of(forPeer(NODE_0_NAME)), assignmentsTimestamp)); + + assertTrue(waitForCondition(() -> lgmCounter.get() == 1, 3_000)); + + // Wait for a couple of iterations of LeaseUpdater. + Thread.sleep(1_000); + + lgmResponseFuture.complete(null); + watchInhibitor.stopInhibit(); + + waitForAcceptedLease(); + + Lease lease = getLeaseFromMs(); + + assertEquals(negotiatedLeaseStartTime.get(), lease.getStartTime().longValue()); + } + + private boolean stableAssignmentsExist(TablePartitionId groupId) { + CompletableFuture<Entry> f = metaStorageManager.get(stablePartAssignmentsKey(groupId)); + + assertThat(f, willSucceedFast()); + + Entry e = f.join(); + + return !e.empty() && !e.tombstone(); + } + + @Nullable private Lease getLeaseFromMs() { CompletableFuture<Entry> f = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY); assertThat(f, willSucceedFast()); Entry e = f.join(); + if (e.empty() || e.tombstone()) { + return null; + } + LeaseBatch leases = LeaseBatch.fromBytes(ByteBuffer.wrap(e.value()).order(ByteOrder.LITTLE_ENDIAN)); - return leases.leases().stream().findFirst().orElseThrow(); + return leases.leases().stream().findFirst().orElse(null); + } + + private void waitForLease() throws InterruptedException { + assertTrue(waitForCondition(() -> { + Lease lease = getLeaseFromMs(); + + return lease != null; + }, 10_000)); } private void waitForAcceptedLease() throws InterruptedException { assertTrue(waitForCondition(() -> { Lease lease = getLeaseFromMs(); + log.info("qqq lease=" + lease); Review Comment: Remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org