sashapolo commented on code in PR #5310:
URL: https://github.com/apache/ignite-3/pull/5310#discussion_r1974268369


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -600,7 +620,7 @@ private CompletableFuture<?> 
createZonePartitionReplicationNode(
                     .thenCompose(v -> 
executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> {
                         replicationGroupIds.add(zonePartitionId);
 
-                        return falseCompletedFuture();
+                        return trueCompletedFuture();

Review Comment:
   This is crazy, why didn't we have any failing tests before?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -555,15 +561,28 @@ private CompletableFuture<?> 
createZonePartitionReplicationNode(
                 this::calculateZoneAssignments,
                 rebalanceRetryDelayConfiguration
         );
-
         Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
+            var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);

Review Comment:
   Should the `storageIndexTracker` be part of `ZonePartitionResources`?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -237,6 +237,12 @@ private IgniteBiTuple<Serializable, Boolean> 
processCrossTableProcessorsCommand(
     ) {
         IgniteBiTuple<Serializable, Boolean> result = new 
IgniteBiTuple<>(null, false);
 
+        // TODO https://issues.apache.org/jira/browse/IGNITE-24517 Remove. In 
case of zero tables we still should

Review Comment:
   You can remove this code, this will be fixed automatically by 
https://github.com/apache/ignite-3/pull/5237



##########
modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java:
##########
@@ -150,7 +158,15 @@ private static void 
assertThatStorageIsStopped(ZonePartitionResources resources)
         );
     }
 
-    private ZonePartitionResources allocatePartitionResources(ZonePartitionId 
zonePartitionId, int partitionCount) {
-        return bypassingThreadAssertions(() -> 
manager.allocateZonePartitionResources(zonePartitionId, partitionCount));
+    private ZonePartitionResources allocatePartitionResources(
+            ZonePartitionId zonePartitionId,
+            int partitionCount,
+            PendingComparableValuesTracker<Long, Void> storageIndexTracker

Review Comment:
   You always pass the same value you have already stored in a field, so this 
parameter is redundant



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -555,15 +561,28 @@ private CompletableFuture<?> 
createZonePartitionReplicationNode(
                 this::calculateZoneAssignments,
                 rebalanceRetryDelayConfiguration
         );
-
         Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
+            var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);
             var eventParams = new 
LocalPartitionReplicaEventParameters(zonePartitionId, revision);
 
-            ZonePartitionResources zoneResources = 
zoneResourcesManager.allocateZonePartitionResources(zonePartitionId, 
partitionCount);
+            ZonePartitionResources zoneResources = 
zoneResourcesManager.allocateZonePartitionResources(
+                    zonePartitionId,
+                    partitionCount,
+                    storageIndexTracker
+            );
 
             return 
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
                     .thenCompose(v -> {
                         try {
+                            // TODO Comment for reviewer. I assume that we 
should not aggregate (min/max) tableStorages.lastAppliedIndex().
+                            //  
https://issues.apache.org/jira/browse/IGNITE-24517 will allow to init 
storageIndexTracker with the value
+                            //  from 
txStatePartitionStorage().lastAppliedIndex(), is that correct?
+                            
storageIndexTracker.update(zoneResources.txStatePartitionStorage().lastAppliedIndex(),
 null);

Review Comment:
   This is correct, but I think that `storageIndexTracker` should not be 
updated here, and should be updated inside `onSnapshotLoad` of the 
corresponding Raft listener. Because otherwise we may have inconsistencies on 
recovery, depending on how this tracker is used.



##########
modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java:
##########
@@ -146,13 +151,16 @@ void setUp(
 
         when(distributionZoneManager.dataNodes(anyLong(), anyInt(), 
anyInt())).thenReturn(completedFuture(Set.of(nodeName)));
 
-        when(zoneResourcesManager.allocateZonePartitionResources(any(), 
anyInt()))
+        when(zoneResourcesManager.allocateZonePartitionResources(any(), 
anyInt(), any()))
                 .thenReturn(new ZonePartitionResources(
                         txStatePartitionStorage,
                         raftGroupListener,
                         partitionSnapshotStorageFactory
                 ));
 
+        when(raftManager.startRaftGroupNode(any(), any(), any(), any(), 
(RaftGroupOptions) any(), any()))

Review Comment:
   why do you need the cast to `RaftGroupOptions`?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -555,15 +561,28 @@ private CompletableFuture<?> 
createZonePartitionReplicationNode(
                 this::calculateZoneAssignments,
                 rebalanceRetryDelayConfiguration
         );
-
         Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
+            var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);
             var eventParams = new 
LocalPartitionReplicaEventParameters(zonePartitionId, revision);
 
-            ZonePartitionResources zoneResources = 
zoneResourcesManager.allocateZonePartitionResources(zonePartitionId, 
partitionCount);
+            ZonePartitionResources zoneResources = 
zoneResourcesManager.allocateZonePartitionResources(
+                    zonePartitionId,
+                    partitionCount,
+                    storageIndexTracker
+            );
 
             return 
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
                     .thenCompose(v -> {
                         try {
+                            // TODO Comment for reviewer. I assume that we 
should not aggregate (min/max) tableStorages.lastAppliedIndex().
+                            //  
https://issues.apache.org/jira/browse/IGNITE-24517 will allow to init 
storageIndexTracker with the value
+                            //  from 
txStatePartitionStorage().lastAppliedIndex(), is that correct?
+                            
storageIndexTracker.update(zoneResources.txStatePartitionStorage().lastAppliedIndex(),
 null);
+
+                            // TODO 
https://issues.apache.org/jira/browse/IGNITE-24654 Properly close 
storageIndexTracker.
+                            //  internalTbl.updatePartitionTrackers is used in 
order to add storageIndexTracker to some context for further
+                            //  storage closing.
+                            // internalTbl.updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);

Review Comment:
   This is confusing, why is this line commented out?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -275,6 +281,14 @@ public void 
onConfigurationCommitted(RaftGroupConfiguration config, long lastApp
 
             tableProcessors.values()
                     .forEach(listener -> 
listener.onConfigurationCommitted(config, lastAppliedIndex, lastAppliedTerm));
+
+            partitionSnapshots().acquireReadLock();

Review Comment:
   Why do we need to acquire the snapshot lock here? As far as I understand, we 
need this lock when processing commands that non-atomically update the storage 
in order to prevent concurrent snapshots from observing an intermediate storage 
state. `updateTrackerIgnoringTrackerClosedException` does not access the 
storage at all. However, we can add this lock around the 
`tableProcessors.values()` call above as an optimization, because this lock is 
taken there for every table processor.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1443,6 +1452,35 @@ private <T> CompletableFuture<T> 
executeUnderZoneWriteLock(int zoneId, Supplier<
         }
     }
 
+    private CompletableFuture<Boolean> 
onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
+        if 
(topologyService.localMember().id().equals(parameters.leaseholderId())) {
+            ZonePartitionId groupId = (ZonePartitionId) parameters.groupId();
+
+            replicaMgr.weakStopReplica(

Review Comment:
   Please leave a comment why we don't wait for this future (at least that this 
is intentional). 



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -307,6 +321,10 @@ public void addTableProcessor(TablePartitionId 
tablePartitionId, RaftTableProces
                         currentCommitedConfiguration.lastAppliedIndex,
                         currentCommitedConfiguration.lastAppliedTerm
                 );
+
+                // TODO https://issues.apache.org/jira/browse/IGNITE-24517 
propagate lease information from txnStateStorage to newly added

Review Comment:
   Will be fixed by https://github.com/apache/ignite-3/pull/5237 as well



##########
modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java:
##########
@@ -59,6 +60,9 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest {
 
     private ZoneResourcesManager manager;
 
+    // TODO https://issues.apache.org/jira/browse/IGNITE-24654 Ensure that 
tracker is closed.
+    private PendingComparableValuesTracker<Long, Void> storageIndexTracker;

Review Comment:
   Why don't you close it in this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to