sashapolo commented on code in PR #5310: URL: https://github.com/apache/ignite-3/pull/5310#discussion_r1974268369
########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -600,7 +620,7 @@ private CompletableFuture<?> createZonePartitionReplicationNode( .thenCompose(v -> executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> { replicationGroupIds.add(zonePartitionId); - return falseCompletedFuture(); + return trueCompletedFuture(); Review Comment: This is crazy, why didn't we have any failing tests before? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -555,15 +561,28 @@ private CompletableFuture<?> createZonePartitionReplicationNode( this::calculateZoneAssignments, rebalanceRetryDelayConfiguration ); - Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> { + var storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L); Review Comment: Should the `storageIndexTracker` be part of `ZonePartitionResources`? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -237,6 +237,12 @@ private IgniteBiTuple<Serializable, Boolean> processCrossTableProcessorsCommand( ) { IgniteBiTuple<Serializable, Boolean> result = new IgniteBiTuple<>(null, false); + // TODO https://issues.apache.org/jira/browse/IGNITE-24517 Remove. In case of zero tables we still should Review Comment: You can remove this code, this will be fixed automatically by https://github.com/apache/ignite-3/pull/5237 ########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java: ########## @@ -150,7 +158,15 @@ private static void assertThatStorageIsStopped(ZonePartitionResources resources) ); } - private ZonePartitionResources allocatePartitionResources(ZonePartitionId zonePartitionId, int partitionCount) { - return bypassingThreadAssertions(() -> manager.allocateZonePartitionResources(zonePartitionId, partitionCount)); + private ZonePartitionResources allocatePartitionResources( + ZonePartitionId zonePartitionId, + int partitionCount, + PendingComparableValuesTracker<Long, Void> storageIndexTracker Review Comment: You always pass the same value you have already stored in a field, so this parameter is redundant ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -555,15 +561,28 @@ private CompletableFuture<?> createZonePartitionReplicationNode( this::calculateZoneAssignments, rebalanceRetryDelayConfiguration ); - Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> { + var storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L); var eventParams = new LocalPartitionReplicaEventParameters(zonePartitionId, revision); - ZonePartitionResources zoneResources = zoneResourcesManager.allocateZonePartitionResources(zonePartitionId, partitionCount); + ZonePartitionResources zoneResources = zoneResourcesManager.allocateZonePartitionResources( + zonePartitionId, + partitionCount, + storageIndexTracker + ); return fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams) .thenCompose(v -> { try { + // TODO Comment for reviewer. I assume that we should not aggregate (min/max) tableStorages.lastAppliedIndex(). + // https://issues.apache.org/jira/browse/IGNITE-24517 will allow to init storageIndexTracker with the value + // from txStatePartitionStorage().lastAppliedIndex(), is that correct? + storageIndexTracker.update(zoneResources.txStatePartitionStorage().lastAppliedIndex(), null); Review Comment: This is correct, but I think that `storageIndexTracker` should not be updated here, and should be updated inside `onSnapshotLoad` of the corresponding Raft listener. Because otherwise we may have inconsistencies on recovery, depending on how this tracker is used. ########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java: ########## @@ -146,13 +151,16 @@ void setUp( when(distributionZoneManager.dataNodes(anyLong(), anyInt(), anyInt())).thenReturn(completedFuture(Set.of(nodeName))); - when(zoneResourcesManager.allocateZonePartitionResources(any(), anyInt())) + when(zoneResourcesManager.allocateZonePartitionResources(any(), anyInt(), any())) .thenReturn(new ZonePartitionResources( txStatePartitionStorage, raftGroupListener, partitionSnapshotStorageFactory )); + when(raftManager.startRaftGroupNode(any(), any(), any(), any(), (RaftGroupOptions) any(), any())) Review Comment: why do you need the cast to `RaftGroupOptions`? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -555,15 +561,28 @@ private CompletableFuture<?> createZonePartitionReplicationNode( this::calculateZoneAssignments, rebalanceRetryDelayConfiguration ); - Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> { + var storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L); var eventParams = new LocalPartitionReplicaEventParameters(zonePartitionId, revision); - ZonePartitionResources zoneResources = zoneResourcesManager.allocateZonePartitionResources(zonePartitionId, partitionCount); + ZonePartitionResources zoneResources = zoneResourcesManager.allocateZonePartitionResources( + zonePartitionId, + partitionCount, + storageIndexTracker + ); return fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams) .thenCompose(v -> { try { + // TODO Comment for reviewer. I assume that we should not aggregate (min/max) tableStorages.lastAppliedIndex(). + // https://issues.apache.org/jira/browse/IGNITE-24517 will allow to init storageIndexTracker with the value + // from txStatePartitionStorage().lastAppliedIndex(), is that correct? + storageIndexTracker.update(zoneResources.txStatePartitionStorage().lastAppliedIndex(), null); + + // TODO https://issues.apache.org/jira/browse/IGNITE-24654 Properly close storageIndexTracker. + // internalTbl.updatePartitionTrackers is used in order to add storageIndexTracker to some context for further + // storage closing. + // internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); Review Comment: This is confusing, why is this line commented out? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -275,6 +281,14 @@ public void onConfigurationCommitted(RaftGroupConfiguration config, long lastApp tableProcessors.values() .forEach(listener -> listener.onConfigurationCommitted(config, lastAppliedIndex, lastAppliedTerm)); + + partitionSnapshots().acquireReadLock(); Review Comment: Why do we need to acquire the snapshot lock here? As far as I understand, we need this lock when processing commands that non-atomically update the storage in order to prevent concurrent snapshots from observing an intermediate storage state. `updateTrackerIgnoringTrackerClosedException` does not access the storage at all. However, we can add this lock around the `tableProcessors.values()` call above as an optimization, because this lock is taken there for every table processor. ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1443,6 +1452,35 @@ private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId, Supplier< } } + private CompletableFuture<Boolean> onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) { + if (topologyService.localMember().id().equals(parameters.leaseholderId())) { + ZonePartitionId groupId = (ZonePartitionId) parameters.groupId(); + + replicaMgr.weakStopReplica( Review Comment: Please leave a comment why we don't wait for this future (at least that this is intentional). ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -307,6 +321,10 @@ public void addTableProcessor(TablePartitionId tablePartitionId, RaftTableProces currentCommitedConfiguration.lastAppliedIndex, currentCommitedConfiguration.lastAppliedTerm ); + + // TODO https://issues.apache.org/jira/browse/IGNITE-24517 propagate lease information from txnStateStorage to newly added Review Comment: Will be fixed by https://github.com/apache/ignite-3/pull/5237 as well ########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java: ########## @@ -59,6 +60,9 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest { private ZoneResourcesManager manager; + // TODO https://issues.apache.org/jira/browse/IGNITE-24654 Ensure that tracker is closed. + private PendingComparableValuesTracker<Long, Void> storageIndexTracker; Review Comment: Why don't you close it in this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org