rpuch commented on code in PR #5212: URL: https://github.com/apache/ignite-3/pull/5212#discussion_r1952514186
########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -242,8 +248,11 @@ private static class Listeners { final ZonePartitionRaftListener raftListener; - Listeners(ZonePartitionRaftListener raftListener) { + final PartitionSnapshotStorageFactory snapshotStorageFactory; + + Listeners(ZonePartitionRaftListener raftListener, PartitionSnapshotStorageFactory snapshotStorageFactory) { Review Comment: `Listeners` does not describe the class accurately anymore, I suggest renaming it. How about `ReplicaContext` or something similar? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java: ########## @@ -46,84 +51,101 @@ * {@code true}, and {@link SnapshotWriter#addFile(String)} throws an exception. */ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { + private final PartitionKey partitionKey; + /** Topology service. */ private final TopologyService topologyService; /** Snapshot manager. */ private final OutgoingSnapshotsManager outgoingSnapshotsManager; - /** Partition storage. */ - private final PartitionAccess partition; + /** + * Partition storages grouped by table ID. + */ + private final Int2ObjectMap<PartitionStorageAccess> partitionsByTableId = synchronize(new Int2ObjectOpenHashMap<>()); + + private final PartitionTxStateAccess txStateStorage; private final CatalogService catalogService; - private final @Nullable SnapshotMeta startupSnapshotMeta; - /** Incoming snapshots executor. */ private final Executor incomingSnapshotsExecutor; - /** - * Constructor. - * - * @param topologyService Topology service. - * @param outgoingSnapshotsManager Snapshot manager. - * @param partition MV partition storage. - * @param catalogService Access to the Catalog. - * @param incomingSnapshotsExecutor Incoming snapshots executor. - * @see SnapshotMeta - */ - @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") + /** Constructor. */ public PartitionSnapshotStorageFactory( + PartitionKey partitionKey, TopologyService topologyService, OutgoingSnapshotsManager outgoingSnapshotsManager, - PartitionAccess partition, + PartitionTxStateAccess txStateStorage, CatalogService catalogService, Executor incomingSnapshotsExecutor ) { + this.partitionKey = partitionKey; this.topologyService = topologyService; this.outgoingSnapshotsManager = outgoingSnapshotsManager; - this.partition = partition; + this.txStateStorage = txStateStorage; this.catalogService = catalogService; this.incomingSnapshotsExecutor = incomingSnapshotsExecutor; + } - // We must choose the minimum applied index for local recovery so that we don't skip the raft commands for the storage with the - // lowest applied index and thus no data loss occurs. - long lastIncludedRaftIndex = partition.minLastAppliedIndex(); - long lastIncludedRaftTerm = partition.minLastAppliedTerm(); - - int lastCatalogVersionAtStart = catalogService.latestCatalogVersion(); - - if (lastIncludedRaftIndex == 0) { - startupSnapshotMeta = null; - } else { - startupSnapshotMeta = snapshotMetaAt( - lastIncludedRaftIndex, - lastIncludedRaftTerm, - Objects.requireNonNull(partition.committedGroupConfiguration()), - lastCatalogVersionAtStart, - collectNextRowIdToBuildIndexesAtStart(lastCatalogVersionAtStart), - partition.leaseStartTime(), - partition.primaryReplicaNodeId(), - partition.primaryReplicaNodeName() - ); - } + /** + * Adds a given table partition storage to the snapshot storage, managed by this factory. Review Comment: Would it make sense to specify how this call should be made with respect to concurrency? Like, under which lock it should be done, or it's completely thread-safe, etc ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java: ########## @@ -490,29 +503,29 @@ private CompletableFuture<Void> completeRebalance(@Nullable Throwable throwable) raftGroupConfig ); - return partitionSnapshotStorage.partition().finishRebalance(RaftSnapshotPartitionMeta.fromSnapshotMeta(meta, raftGroupConfig)); + return finishRebalance(RaftSnapshotPartitionMeta.fromSnapshotMeta(meta, raftGroupConfig)); } finally { busyLock.leaveBusy(); } } private int partId() { - return partitionSnapshotStorage.partition().partitionKey().partitionId(); + return partitionSnapshotStorage.partitionKey().partitionId(); } private String createPartitionInfo() { - return "tableId=" + partitionSnapshotStorage.partition().partitionKey().tableId() + ", partitionId=" + partId(); + return partitionSnapshotStorage.partitionKey().toString(); } - private void writeVersion(ResponseEntry entry, int i) { + private void writeVersion(PartitionSnapshotMeta snapshotMeta, ResponseEntry entry, int i) { Review Comment: ```suggestion private void writeVersion(PartitionSnapshotMeta snapshotMeta, ResponseEntry entry, int entryIndex) { ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionStorageAccess.java: ########## @@ -31,33 +30,17 @@ import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.StorageRebalanceException; import org.apache.ignite.internal.storage.TxIdMismatchException; -import org.apache.ignite.internal.tx.TxMeta; -import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; /** * Small abstractions for partition storages that includes only methods, mandatory for the snapshot storage. */ -public interface PartitionAccess { - /** - * Returns the key that uniquely identifies the corresponding partition. - */ - PartitionKey partitionKey(); - - /** - * Creates a cursor to scan all meta of transactions. - * - * <p>All metas that exist at the time the method is called will be returned, in transaction ID order as an unsigned 128 bit integer. - */ - Cursor<IgniteBiTuple<UUID, TxMeta>> getAllTxMeta(); +public interface PartitionStorageAccess { Review Comment: ```suggestion public interface PartitionMvStorageAccess { ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -486,17 +509,16 @@ public boolean addRowIdToSkip(RowId rowId) { * @param rowId RowId. * @return {@code true} if the given RowId is already passed by the snapshot in normal rows sending order. */ - public boolean alreadyPassed(RowId rowId) { + public boolean alreadyPassed(int tableId, RowId rowId) { assert mvOperationsLock.isLocked() : "MV operations lock must be acquired!"; - if (!startedToReadMvPartition) { - return false; - } if (finishedMvData()) { return true; } - return rowId.compareTo(lastRowId) <= 0; + return partitionDeliveryState != null + && tableId <= partitionDeliveryState.currentTableId() + && rowId.compareTo(partitionDeliveryState.currentRowId()) <= 0; Review Comment: Imagine we have 2 tables with IDs 1 and 2. We are currently sending the very beginning of table 2. `alreadyPassed(1, hugeRowId)` must return `true` (as we are already working on table 2), but this expression could evaluate to `false` (because `currentRowId()` might be `tinyRowId < hugeRowId`). Let's add a unit test for this ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -239,4 +298,22 @@ private static <T extends Comparable<T>> void updateTrackerIgnoringTrackerClosed // No-op. } } + + private void cleanupSnapshots() { Review Comment: This method (and a couple of other methods) are copy-pasted from `SnapshotAwarePartitionDataStorage`. Would it make sense to break `SnapshotAwarePartitionDataStorage` in two parts and reuse one part here? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java: ########## @@ -46,84 +51,101 @@ * {@code true}, and {@link SnapshotWriter#addFile(String)} throws an exception. */ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { + private final PartitionKey partitionKey; + /** Topology service. */ private final TopologyService topologyService; /** Snapshot manager. */ private final OutgoingSnapshotsManager outgoingSnapshotsManager; - /** Partition storage. */ - private final PartitionAccess partition; + /** + * Partition storages grouped by table ID. + */ + private final Int2ObjectMap<PartitionStorageAccess> partitionsByTableId = synchronize(new Int2ObjectOpenHashMap<>()); + + private final PartitionTxStateAccess txStateStorage; private final CatalogService catalogService; - private final @Nullable SnapshotMeta startupSnapshotMeta; - /** Incoming snapshots executor. */ private final Executor incomingSnapshotsExecutor; - /** - * Constructor. - * - * @param topologyService Topology service. - * @param outgoingSnapshotsManager Snapshot manager. - * @param partition MV partition storage. - * @param catalogService Access to the Catalog. - * @param incomingSnapshotsExecutor Incoming snapshots executor. - * @see SnapshotMeta - */ - @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") + /** Constructor. */ public PartitionSnapshotStorageFactory( + PartitionKey partitionKey, TopologyService topologyService, OutgoingSnapshotsManager outgoingSnapshotsManager, - PartitionAccess partition, + PartitionTxStateAccess txStateStorage, CatalogService catalogService, Executor incomingSnapshotsExecutor ) { + this.partitionKey = partitionKey; this.topologyService = topologyService; this.outgoingSnapshotsManager = outgoingSnapshotsManager; - this.partition = partition; + this.txStateStorage = txStateStorage; this.catalogService = catalogService; this.incomingSnapshotsExecutor = incomingSnapshotsExecutor; + } - // We must choose the minimum applied index for local recovery so that we don't skip the raft commands for the storage with the - // lowest applied index and thus no data loss occurs. - long lastIncludedRaftIndex = partition.minLastAppliedIndex(); - long lastIncludedRaftTerm = partition.minLastAppliedTerm(); - - int lastCatalogVersionAtStart = catalogService.latestCatalogVersion(); - - if (lastIncludedRaftIndex == 0) { - startupSnapshotMeta = null; - } else { - startupSnapshotMeta = snapshotMetaAt( - lastIncludedRaftIndex, - lastIncludedRaftTerm, - Objects.requireNonNull(partition.committedGroupConfiguration()), - lastCatalogVersionAtStart, - collectNextRowIdToBuildIndexesAtStart(lastCatalogVersionAtStart), - partition.leaseStartTime(), - partition.primaryReplicaNodeId(), - partition.primaryReplicaNodeName() - ); - } + /** + * Adds a given table partition storage to the snapshot storage, managed by this factory. + */ + public void addMvPartition(int tableId, PartitionStorageAccess partition) { + PartitionStorageAccess prev = partitionsByTableId.put(tableId, partition); + + assert prev == null : "Partition storage for table ID " + tableId + " already exists."; + } + + public void removeMvPartition(int tableId) { + partitionsByTableId.remove(tableId); } @Override public @Nullable PartitionSnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions) { return new PartitionSnapshotStorage( + partitionKey, topologyService, outgoingSnapshotsManager, uri, raftOptions, - partition, + partitionsByTableId, + txStateStorage, catalogService, - startupSnapshotMeta, + createStartupSnapshotMeta(), incomingSnapshotsExecutor ); } + private @Nullable SnapshotMeta createStartupSnapshotMeta() { + // We must choose the minimum applied index for local recovery so that we don't skip the raft commands for the storage with the + // lowest applied index and thus no data loss occurs. + return partitionsByTableId.values().stream() + .min(comparingLong(PartitionStorageAccess::lastAppliedIndex)) + .map(storageWithMinLastAppliedIndex -> { + long minLastAppliedIndex = min(storageWithMinLastAppliedIndex.lastAppliedIndex(), txStateStorage.lastAppliedIndex()); + + if (minLastAppliedIndex == 0) { + return null; + } + + int lastCatalogVersionAtStart = catalogService.latestCatalogVersion(); + + return snapshotMetaAt( + min(storageWithMinLastAppliedIndex.lastAppliedIndex(), txStateStorage.lastAppliedIndex()), Review Comment: ```suggestion minLastAppliedIndex, ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java: ########## @@ -46,84 +51,101 @@ * {@code true}, and {@link SnapshotWriter#addFile(String)} throws an exception. */ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { + private final PartitionKey partitionKey; + /** Topology service. */ private final TopologyService topologyService; /** Snapshot manager. */ private final OutgoingSnapshotsManager outgoingSnapshotsManager; - /** Partition storage. */ - private final PartitionAccess partition; + /** + * Partition storages grouped by table ID. + */ + private final Int2ObjectMap<PartitionStorageAccess> partitionsByTableId = synchronize(new Int2ObjectOpenHashMap<>()); + + private final PartitionTxStateAccess txStateStorage; private final CatalogService catalogService; - private final @Nullable SnapshotMeta startupSnapshotMeta; - /** Incoming snapshots executor. */ private final Executor incomingSnapshotsExecutor; - /** - * Constructor. - * - * @param topologyService Topology service. - * @param outgoingSnapshotsManager Snapshot manager. - * @param partition MV partition storage. - * @param catalogService Access to the Catalog. - * @param incomingSnapshotsExecutor Incoming snapshots executor. - * @see SnapshotMeta - */ - @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") + /** Constructor. */ public PartitionSnapshotStorageFactory( + PartitionKey partitionKey, TopologyService topologyService, OutgoingSnapshotsManager outgoingSnapshotsManager, - PartitionAccess partition, + PartitionTxStateAccess txStateStorage, CatalogService catalogService, Executor incomingSnapshotsExecutor ) { + this.partitionKey = partitionKey; this.topologyService = topologyService; this.outgoingSnapshotsManager = outgoingSnapshotsManager; - this.partition = partition; + this.txStateStorage = txStateStorage; this.catalogService = catalogService; this.incomingSnapshotsExecutor = incomingSnapshotsExecutor; + } - // We must choose the minimum applied index for local recovery so that we don't skip the raft commands for the storage with the - // lowest applied index and thus no data loss occurs. - long lastIncludedRaftIndex = partition.minLastAppliedIndex(); - long lastIncludedRaftTerm = partition.minLastAppliedTerm(); - - int lastCatalogVersionAtStart = catalogService.latestCatalogVersion(); - - if (lastIncludedRaftIndex == 0) { - startupSnapshotMeta = null; - } else { - startupSnapshotMeta = snapshotMetaAt( - lastIncludedRaftIndex, - lastIncludedRaftTerm, - Objects.requireNonNull(partition.committedGroupConfiguration()), - lastCatalogVersionAtStart, - collectNextRowIdToBuildIndexesAtStart(lastCatalogVersionAtStart), - partition.leaseStartTime(), - partition.primaryReplicaNodeId(), - partition.primaryReplicaNodeName() - ); - } + /** + * Adds a given table partition storage to the snapshot storage, managed by this factory. + */ + public void addMvPartition(int tableId, PartitionStorageAccess partition) { + PartitionStorageAccess prev = partitionsByTableId.put(tableId, partition); + + assert prev == null : "Partition storage for table ID " + tableId + " already exists."; + } + + public void removeMvPartition(int tableId) { Review Comment: Not used anywhere yet. Will it be used when table removal from zone is implemented? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java: ########## @@ -270,18 +278,25 @@ public void close() { @Override public SnapshotReader getReader() { + CompletableFuture<PartitionSnapshotMeta> snapshotMetaFuture = this.snapshotMetaFuture; + // This one's called when "join" is complete. - return new IncomingSnapshotReader(snapshotMeta); + assert snapshotMetaFuture != null && snapshotMetaFuture.isDone(); + + // Use 'null' if we failed or were cancelled, this is what JRaft expects. + PartitionSnapshotMeta meta = snapshotMetaFuture.isCompletedExceptionally() ? null : snapshotMetaFuture.join(); + + return new IncomingSnapshotReader(meta); } private @Nullable ClusterNode getSnapshotSender(String nodeName) { return partitionSnapshotStorage.topologyService().getByConsistentId(nodeName); } /** - * Requests and saves the snapshot meta in {@link #snapshotMeta}. + * Requests and the snapshot meta. Review Comment: The sentence seems to be broken ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java: ########## @@ -53,6 +54,28 @@ void onConfigurationCommitted( long lastAppliedTerm ); + /** + * Returns the last applied Raft log index. + */ + long lastAppliedIndex(); + + /** + * Returns the last applied Raft term. Review Comment: ```suggestion * Returns the term of the last applied Raft index. ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot; + +import java.util.Objects; +import org.apache.ignite.internal.tostring.S; + +/** + * Uniquely identifies a partition. This is a pair of zone ID and partition number (aka partition ID). + */ +public class ZonePartitionKey implements PartitionKey { + private final int zoneId; + + private final int partitionId; + + /** + * Returns ID of the zone. + */ + public int tableId() { Review Comment: Is the name ok? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing; + +import java.util.Collection; +import java.util.Iterator; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.storage.RowId; +import org.jetbrains.annotations.Nullable; + +/** + * Outgoing snapshot delivery state for a given partition. + */ +class PartitionDeliveryState { + private final Iterator<PartitionStorageAccess> partitionStoragesIterator; + + /** + * Current row ID within the current partition storage. Can be {@code null} only if the snapshot has delivered all possible data. + */ + @Nullable + private RowId currentRowId; + + /** + * Current partition storage. Can be {@code null} only if the snapshot has delivered all possible data. + */ + @Nullable + private PartitionStorageAccess currentPartitionStorage; + + PartitionDeliveryState(Collection<PartitionStorageAccess> partitionStorages) { + this.partitionStoragesIterator = partitionStorages.iterator(); + + advance(); + } + + RowId currentRowId() { + assert currentRowId != null; + + return currentRowId; + } + + PartitionStorageAccess currentPartitionStorage() { + assert currentPartitionStorage != null; + + return currentPartitionStorage; + } + + int currentTableId() { + return currentPartitionStorage().tableId(); + } + + boolean isEmpty() { Review Comment: ```suggestion boolean isExhausted() { ``` Meaning that it *became* empty because we sent all data ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -162,30 +170,38 @@ void freezeScopeUnderMvLock() { try { frozenMeta = takeSnapshotMeta(); - txDataCursor = partition.getAllTxMeta(); + txDataCursor = txState.getAllTxMeta(); } finally { releaseMvLock(); } } private PartitionSnapshotMeta takeSnapshotMeta() { - RaftGroupConfiguration config = partition.committedGroupConfiguration(); + PartitionStorageAccess partitionStorageWithMaxAppliedIndex = partitionsByTableId.values().stream() + .max(comparingLong(PartitionStorageAccess::lastAppliedIndex)) + .orElseThrow(); Review Comment: How is it guaranteed that `partitionsByTableId` is not empty? What if a zone was created, but it doesn't contain any table yet? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing; + +import java.util.Collection; +import java.util.Iterator; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.storage.RowId; +import org.jetbrains.annotations.Nullable; + +/** + * Outgoing snapshot delivery state for a given partition. + */ +class PartitionDeliveryState { + private final Iterator<PartitionStorageAccess> partitionStoragesIterator; + + /** + * Current row ID within the current partition storage. Can be {@code null} only if the snapshot has delivered all possible data. + */ + @Nullable + private RowId currentRowId; + + /** + * Current partition storage. Can be {@code null} only if the snapshot has delivered all possible data. + */ + @Nullable + private PartitionStorageAccess currentPartitionStorage; + + PartitionDeliveryState(Collection<PartitionStorageAccess> partitionStorages) { + this.partitionStoragesIterator = partitionStorages.iterator(); + + advance(); + } + + RowId currentRowId() { + assert currentRowId != null; + + return currentRowId; + } + + PartitionStorageAccess currentPartitionStorage() { + assert currentPartitionStorage != null; + + return currentPartitionStorage; + } + + int currentTableId() { + return currentPartitionStorage().tableId(); + } + + boolean isEmpty() { + return currentPartitionStorage == null; + } + + void advance() { + if (currentPartitionStorage == null) { + if (!partitionStoragesIterator.hasNext()) { + return; + } + + currentPartitionStorage = partitionStoragesIterator.next(); + + currentRowId = currentPartitionStorage.closestRowId(RowId.lowestRowId(currentPartitionStorage.partitionId())); + + // Partition is empty, try the next one. Review Comment: It seems that the comment should be inside `if` body ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -304,23 +320,28 @@ private static long rowSizeInBytes(List<BinaryRowMessage> rowVersions) { return sum; } - private long tryProcessRowFromPartition(List<SnapshotMvDataResponse.ResponseEntry> batch, long totalBatchSize, - SnapshotMvDataRequest request) { + private long tryProcessRowFromPartition( + List<SnapshotMvDataResponse.ResponseEntry> batch, + long totalBatchSize, + SnapshotMvDataRequest request + ) { if (batchIsFull(request, totalBatchSize) || finishedMvData()) { return totalBatchSize; } - if (!startedToReadMvPartition) { - lastRowId = partition.closestRowId(lastRowId); - - startedToReadMvPartition = true; + if (partitionDeliveryState == null) { + partitionDeliveryState = new PartitionDeliveryState(partitionsByTableId.values()); } else { - lastRowId = partition.closestRowId(lastRowId.increment()); + partitionDeliveryState.advance(); } - if (!finishedMvData()) { - if (!rowIdsToSkip.remove(lastRowId)) { - SnapshotMvDataResponse.ResponseEntry rowEntry = rowEntry(lastRowId); + if (!partitionDeliveryState.isEmpty()) { Review Comment: How about using `!finishedMvData()` instead (as it was)? It has the same effect, but it seems a bit more clear what we want to check in the condition (and also, this piece of code is the main reason the method was introduced) ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -554,10 +564,20 @@ private CompletableFuture<?> createZonePartitionReplicationNode( txManager, safeTimeTracker, storageIndexTracker, - zonePartitionId + zonePartitionId, + outgoingSnapshotsManager ); - var listeners = new Listeners(raftGroupListener); + var snapshotStorageFactory = new PartitionSnapshotStorageFactory( + new ZonePartitionKey(zonePartitionId.zoneId(), zonePartitionId.partitionId()), Review Comment: Why do we need a key? Can't partition ID by itself play this role? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing; + +import java.util.Collection; +import java.util.Iterator; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.storage.RowId; +import org.jetbrains.annotations.Nullable; + +/** + * Outgoing snapshot delivery state for a given partition. + */ +class PartitionDeliveryState { + private final Iterator<PartitionStorageAccess> partitionStoragesIterator; + + /** + * Current row ID within the current partition storage. Can be {@code null} only if the snapshot has delivered all possible data. + */ + @Nullable + private RowId currentRowId; + + /** + * Current partition storage. Can be {@code null} only if the snapshot has delivered all possible data. + */ + @Nullable + private PartitionStorageAccess currentPartitionStorage; + + PartitionDeliveryState(Collection<PartitionStorageAccess> partitionStorages) { + this.partitionStoragesIterator = partitionStorages.iterator(); + + advance(); + } + + RowId currentRowId() { + assert currentRowId != null; + + return currentRowId; + } + + PartitionStorageAccess currentPartitionStorage() { + assert currentPartitionStorage != null; + + return currentPartitionStorage; + } + + int currentTableId() { + return currentPartitionStorage().tableId(); + } + + boolean isEmpty() { + return currentPartitionStorage == null; + } + + void advance() { + if (currentPartitionStorage == null) { + if (!partitionStoragesIterator.hasNext()) { + return; + } + + currentPartitionStorage = partitionStoragesIterator.next(); + + currentRowId = currentPartitionStorage.closestRowId(RowId.lowestRowId(currentPartitionStorage.partitionId())); + + // Partition is empty, try the next one. + if (currentRowId == null) { + moveToNextPartitionStorage(); + } + } else { + assert currentRowId != null; + + RowId nextRowId = currentRowId.increment(); + + // We've exhausted all possible row IDs in the partition, switch to the next one. + if (nextRowId == null) { + moveToNextPartitionStorage(); + } else { + currentRowId = currentPartitionStorage.closestRowId(nextRowId); + + // We've read all data from this partition, switch to the next one. + if (currentRowId == null) { + moveToNextPartitionStorage(); + } + } + } + } + + private void moveToNextPartitionStorage() { + currentPartitionStorage = null; + + advance(); Review Comment: This creates a recursion. If we have a million of empty tables, we could blow up the stack ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java: ########## @@ -103,19 +113,10 @@ public class OutgoingSnapshot { private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData = new ArrayDeque<>(); /** - * {@link RowId} used to point (most of the time) to the last processed row. More precisely: - * - * <ul> - * <li>Before we started to read from the partition, this is equal to lowest theoretically possible - * {@link RowId} for this partition</li> - * <li>If we started to read from partition AND it is not yet exhausted, this is the last RowId that was - * sent in this snapshot order</li> - * <li>After we exhausted the partition, this is {@code null}</li> - * </ul> + * Current delivery state of partition data. Can be {@code null} only if the delivery has not started yet.. */ - private RowId lastRowId; - - private boolean startedToReadMvPartition = false; + @Nullable + private PartitionDeliveryState partitionDeliveryState; Review Comment: ```suggestion private PartitionDeliveryState partitionMvDeliveryState; ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java: ########## @@ -195,18 +228,44 @@ public void onConfigurationCommitted(RaftGroupConfiguration config, long lastApp @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) { - // TODO: implement, see https://issues.apache.org/jira/browse/IGNITE-22416 - throw new UnsupportedOperationException("Snapshotting is not implemented"); Review Comment: Could you please also port the comment from `PartitionListener#onSnapshotSave()`? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing; + +import java.util.Collection; +import java.util.Iterator; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.storage.RowId; +import org.jetbrains.annotations.Nullable; + +/** + * Outgoing snapshot delivery state for a given partition. + */ +class PartitionDeliveryState { Review Comment: ```suggestion class PartitionMvDeliveryState { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org