denis-chudov commented on code in PR #4707: URL: https://github.com/apache/ignite-3/pull/4707#discussion_r1840362315
########## modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java: ########## @@ -443,8 +443,19 @@ private void updateLeaseBatchInternal() { ? lease.getLeaseholder() : lease.proposedCandidate(); - ClusterNode candidate = nextLeaseHolder(assignments, grpId, proposedLeaseholder); + ClusterNode candidate = nextLeaseHolder(stableAssignments, grpId, proposedLeaseholder); + // if there wasn't a candidate among stable assignments set then make attempt to select a candidate among pending set Review Comment: ```suggestion // If there wasn't a candidate among stable assignments set then make attempt to select a candidate among pending set. ``` ########## modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java: ########## @@ -443,8 +443,19 @@ private void updateLeaseBatchInternal() { ? lease.getLeaseholder() : lease.proposedCandidate(); - ClusterNode candidate = nextLeaseHolder(assignments, grpId, proposedLeaseholder); + ClusterNode candidate = nextLeaseHolder(stableAssignments, grpId, proposedLeaseholder); + // if there wasn't a candidate among stable assignments set then make attempt to select a candidate among pending set + if (candidate == null) { + Map<ReplicationGroupId, TokenizedAssignments> pendingMap = assignmentsTracker.pendingAssignments(); + + if (pendingMap.containsKey(grpId)) { + Set<Assignment> pendingAssignments = pendingMap.get(grpId).nodes(); + candidate = nextLeaseHolder(pendingAssignments, grpId, proposedLeaseholder); + } + } + + // if we couldn't find a candidate neither stable nor pending assignments set, so update stats and skip iteration Review Comment: ```suggestion // If we couldn't find a candidate in neither stable nor pending assignments set, so update stats and skip iteration. ``` ########## modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java: ########## @@ -117,19 +123,30 @@ public class LeaseUpdaterTest extends BaseIgniteAbstractTest { void setUp() { HybridClockImpl clock = new HybridClockImpl(); - Entry entry = new EntryImpl( + Entry stableEntry = new EntryImpl( stablePartAssignmentsKey(new TablePartitionId(1, 0)).bytes(), Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), Assignment.forPeer(node.name())).toBytes(), 1, clock.now() ); - when(mcEntriesCursor.iterator()).thenReturn(List.of(entry).iterator()); + Entry pendingEntry = new EntryImpl( + pendingPartAssignmentsKey(new TablePartitionId(1, 0)).bytes(), + Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), Assignment.forPeer(node.name())).toBytes(), + 1, + clock.now() + ); + + when(msStableAssignmentsEntriesCursor.iterator()).thenReturn(List.of(stableEntry).iterator()); Review Comment: I would suggest a simple test when `msStableAssignmentsEntriesCursor` returns empty iterator and a node from pending assignments is selected as a leaseholder. Ofc another node should be written to `pendingEntry`. ########## modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java: ########## @@ -412,7 +412,7 @@ private void updateLeaseBatchInternal() { if (!lease.isAccepted()) { LeaseAgreement agreement = leaseNegotiator.getAndRemoveIfReady(grpId); - agreement.checkValid(grpId, topologyTracker.currentTopologySnapshot(), assignments); + agreement.checkValid(grpId, topologyTracker.currentTopologySnapshot(), stableAssignments); Review Comment: I think, here should be a union of stable and pending assignments. Otherwise it will lead to invalidation of lease agreement created for pending assignment. ########## modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java: ########## @@ -147,48 +141,137 @@ public CompletableFuture<List<TokenizedAssignments>> getAssignments( } /** - * Gets assignments. + * Gets stable assignments. * - * @return Map replication group id to its assignment. + * @return Map replication group id to its stable assignments. */ - public Map<ReplicationGroupId, TokenizedAssignments> assignments() { - return groupAssignments; + Map<ReplicationGroupId, TokenizedAssignments> stableAssignments() { + return groupStableAssignments; } /** - * Meta storage assignments watch. + * Gets pending assignments. + * + * @return Map replication group id to its pending assignments. */ - private class AssignmentsListener implements WatchListener { - @Override - public CompletableFuture<Void> onUpdate(WatchEvent event) { - assert !event.entryEvents().stream().anyMatch(e -> e.newEntry().empty()) : "New assignments are empty"; - - if (LOG.isDebugEnabled()) { - LOG.debug("Assignment update [revision={}, keys={}]", event.revision(), - event.entryEvents().stream() - .map(e -> new ByteArray(e.newEntry().key()).toString()) - .collect(Collectors.joining(","))); + Map<ReplicationGroupId, TokenizedAssignments> pendingAssignments() { + return groupStableAssignments; + } + + private WatchListener createStableAssignmentsListener() { + return new WatchListener() { + @Override + public CompletableFuture<Void> onUpdate(WatchEvent event) { + assert areAssignmentsPresentedInEvent(event) : "New assignments are empty"; + + if (LOG.isDebugEnabled()) { + LOG.debug("Stable assignments update [revision={}, keys={}]", event.revision(), collectKeysFromEventAsString(event)); + } + + handleReceivedAssignments(event, STABLE_ASSIGNMENTS_PREFIX, groupStableAssignments); + + return nullCompletedFuture(); } - for (EntryEvent evt : event.entryEvents()) { - Entry entry = evt.newEntry(); + @Override + public void onError(Throwable e) { + } + }; + } - var replicationGrpId = TablePartitionId.fromString( - new String(entry.key(), StandardCharsets.UTF_8).replace(STABLE_ASSIGNMENTS_PREFIX, "")); + private WatchListener createPendingAssignmentsListener() { + return new WatchListener() { + @Override + public CompletableFuture<Void> onUpdate(WatchEvent event) { + assert areAssignmentsPresentedInEvent(event) : "New assignments are empty"; Review Comment: I it really impossible? I mean, on a cluster where no tables exist Do we always get assignments from default zone? I am not sure this check is needed as strict assertion check Same question for stables ########## modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java: ########## @@ -443,8 +443,19 @@ private void updateLeaseBatchInternal() { ? lease.getLeaseholder() : lease.proposedCandidate(); - ClusterNode candidate = nextLeaseHolder(assignments, grpId, proposedLeaseholder); + ClusterNode candidate = nextLeaseHolder(stableAssignments, grpId, proposedLeaseholder); + // if there wasn't a candidate among stable assignments set then make attempt to select a candidate among pending set + if (candidate == null) { + Map<ReplicationGroupId, TokenizedAssignments> pendingMap = assignmentsTracker.pendingAssignments(); + + if (pendingMap.containsKey(grpId)) { + Set<Assignment> pendingAssignments = pendingMap.get(grpId).nodes(); + candidate = nextLeaseHolder(pendingAssignments, grpId, proposedLeaseholder); + } + } Review Comment: This is the place where prolonged and expired leases are processed, while you need to create a new one. So please move this block to `chooseCandidateAndCreateNewLease`, it's more suitable there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org