denis-chudov commented on code in PR #4707:
URL: https://github.com/apache/ignite-3/pull/4707#discussion_r1840362315


##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -443,8 +443,19 @@ private void updateLeaseBatchInternal() {
                             ? lease.getLeaseholder()
                             : lease.proposedCandidate();
 
-                    ClusterNode candidate = nextLeaseHolder(assignments, 
grpId, proposedLeaseholder);
+                    ClusterNode candidate = nextLeaseHolder(stableAssignments, 
grpId, proposedLeaseholder);
 
+                    // if there wasn't a candidate among stable assignments 
set then make attempt to select a candidate among pending set

Review Comment:
   ```suggestion
                       // If there wasn't a candidate among stable assignments 
set then make attempt to select a candidate among pending set.
   ```



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -443,8 +443,19 @@ private void updateLeaseBatchInternal() {
                             ? lease.getLeaseholder()
                             : lease.proposedCandidate();
 
-                    ClusterNode candidate = nextLeaseHolder(assignments, 
grpId, proposedLeaseholder);
+                    ClusterNode candidate = nextLeaseHolder(stableAssignments, 
grpId, proposedLeaseholder);
 
+                    // if there wasn't a candidate among stable assignments 
set then make attempt to select a candidate among pending set
+                    if (candidate == null) {
+                        Map<ReplicationGroupId, TokenizedAssignments> 
pendingMap = assignmentsTracker.pendingAssignments();
+
+                        if (pendingMap.containsKey(grpId)) {
+                            Set<Assignment> pendingAssignments = 
pendingMap.get(grpId).nodes();
+                            candidate = nextLeaseHolder(pendingAssignments, 
grpId, proposedLeaseholder);
+                        }
+                    }
+
+                    // if we couldn't find a candidate neither stable nor 
pending assignments set, so update stats and skip iteration

Review Comment:
   ```suggestion
                       // If we couldn't find a candidate in neither stable nor 
pending assignments set, so update stats and skip iteration.
   ```



##########
modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java:
##########
@@ -117,19 +123,30 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
     void setUp() {
         HybridClockImpl clock = new HybridClockImpl();
 
-        Entry entry = new EntryImpl(
+        Entry stableEntry = new EntryImpl(
                 stablePartAssignmentsKey(new TablePartitionId(1, 0)).bytes(),
                 Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), 
Assignment.forPeer(node.name())).toBytes(),
                 1,
                 clock.now()
         );
 
-        when(mcEntriesCursor.iterator()).thenReturn(List.of(entry).iterator());
+        Entry pendingEntry = new EntryImpl(
+                pendingPartAssignmentsKey(new TablePartitionId(1, 0)).bytes(),
+                Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), 
Assignment.forPeer(node.name())).toBytes(),
+                1,
+                clock.now()
+        );
+
+        
when(msStableAssignmentsEntriesCursor.iterator()).thenReturn(List.of(stableEntry).iterator());

Review Comment:
   I would suggest a simple test when `msStableAssignmentsEntriesCursor` 
returns empty iterator and a node from pending assignments is selected as a 
leaseholder. Ofc another node should be written to `pendingEntry`.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -412,7 +412,7 @@ private void updateLeaseBatchInternal() {
                 if (!lease.isAccepted()) {
                     LeaseAgreement agreement = 
leaseNegotiator.getAndRemoveIfReady(grpId);
 
-                    agreement.checkValid(grpId, 
topologyTracker.currentTopologySnapshot(), assignments);
+                    agreement.checkValid(grpId, 
topologyTracker.currentTopologySnapshot(), stableAssignments);

Review Comment:
   I think, here should be a union of stable and pending assignments. Otherwise 
it will lead to invalidation of lease agreement created for pending assignment.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java:
##########
@@ -147,48 +141,137 @@ public CompletableFuture<List<TokenizedAssignments>> 
getAssignments(
     }
 
     /**
-     * Gets assignments.
+     * Gets stable assignments.
      *
-     * @return Map replication group id to its assignment.
+     * @return Map replication group id to its stable assignments.
      */
-    public Map<ReplicationGroupId, TokenizedAssignments> assignments() {
-        return groupAssignments;
+    Map<ReplicationGroupId, TokenizedAssignments> stableAssignments() {
+        return groupStableAssignments;
     }
 
     /**
-     * Meta storage assignments watch.
+     * Gets pending assignments.
+     *
+     * @return Map replication group id to its pending assignments.
      */
-    private class AssignmentsListener implements WatchListener {
-        @Override
-        public CompletableFuture<Void> onUpdate(WatchEvent event) {
-            assert !event.entryEvents().stream().anyMatch(e -> 
e.newEntry().empty()) : "New assignments are empty";
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Assignment update [revision={}, keys={}]", 
event.revision(),
-                        event.entryEvents().stream()
-                                .map(e -> new 
ByteArray(e.newEntry().key()).toString())
-                                .collect(Collectors.joining(",")));
+    Map<ReplicationGroupId, TokenizedAssignments> pendingAssignments() {
+        return groupStableAssignments;
+    }
+
+    private WatchListener createStableAssignmentsListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                assert areAssignmentsPresentedInEvent(event) : "New 
assignments are empty";
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Stable assignments update [revision={}, 
keys={}]", event.revision(), collectKeysFromEventAsString(event));
+                }
+
+                handleReceivedAssignments(event, STABLE_ASSIGNMENTS_PREFIX, 
groupStableAssignments);
+
+                return nullCompletedFuture();
             }
 
-            for (EntryEvent evt : event.entryEvents()) {
-                Entry entry = evt.newEntry();
+            @Override
+            public void onError(Throwable e) {
+            }
+        };
+    }
 
-                var replicationGrpId = TablePartitionId.fromString(
-                        new String(entry.key(), 
StandardCharsets.UTF_8).replace(STABLE_ASSIGNMENTS_PREFIX, ""));
+    private WatchListener createPendingAssignmentsListener() {
+        return new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                assert areAssignmentsPresentedInEvent(event) : "New 
assignments are empty";

Review Comment:
   I it really impossible? I mean, on a cluster where no tables exist
   Do we always get assignments from default zone?
   I am not sure this check is needed as strict assertion check
   Same question for stables



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -443,8 +443,19 @@ private void updateLeaseBatchInternal() {
                             ? lease.getLeaseholder()
                             : lease.proposedCandidate();
 
-                    ClusterNode candidate = nextLeaseHolder(assignments, 
grpId, proposedLeaseholder);
+                    ClusterNode candidate = nextLeaseHolder(stableAssignments, 
grpId, proposedLeaseholder);
 
+                    // if there wasn't a candidate among stable assignments 
set then make attempt to select a candidate among pending set
+                    if (candidate == null) {
+                        Map<ReplicationGroupId, TokenizedAssignments> 
pendingMap = assignmentsTracker.pendingAssignments();
+
+                        if (pendingMap.containsKey(grpId)) {
+                            Set<Assignment> pendingAssignments = 
pendingMap.get(grpId).nodes();
+                            candidate = nextLeaseHolder(pendingAssignments, 
grpId, proposedLeaseholder);
+                        }
+                    }

Review Comment:
   This is the place where prolonged and expired leases are processed, while 
you need to create a new one. So please move this block to 
`chooseCandidateAndCreateNewLease`, it's more suitable there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to