rpuch commented on code in PR #5212:
URL: https://github.com/apache/ignite-3/pull/5212#discussion_r1952514186


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -242,8 +248,11 @@ private static class Listeners {
 
         final ZonePartitionRaftListener raftListener;
 
-        Listeners(ZonePartitionRaftListener raftListener) {
+        final PartitionSnapshotStorageFactory snapshotStorageFactory;
+
+        Listeners(ZonePartitionRaftListener raftListener, 
PartitionSnapshotStorageFactory snapshotStorageFactory) {

Review Comment:
   `Listeners` does not describe the class accurately anymore, I suggest 
renaming it. How about `ReplicaContext` or something similar?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java:
##########
@@ -46,84 +51,101 @@
  * {@code true}, and {@link SnapshotWriter#addFile(String)} throws an 
exception.
  */
 public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory 
{
+    private final PartitionKey partitionKey;
+
     /** Topology service. */
     private final TopologyService topologyService;
 
     /** Snapshot manager. */
     private final OutgoingSnapshotsManager outgoingSnapshotsManager;
 
-    /** Partition storage. */
-    private final PartitionAccess partition;
+    /**
+     * Partition storages grouped by table ID.
+     */
+    private final Int2ObjectMap<PartitionStorageAccess> partitionsByTableId = 
synchronize(new Int2ObjectOpenHashMap<>());
+
+    private final PartitionTxStateAccess txStateStorage;
 
     private final CatalogService catalogService;
 
-    private final @Nullable SnapshotMeta startupSnapshotMeta;
-
     /** Incoming snapshots executor. */
     private final Executor incomingSnapshotsExecutor;
 
-    /**
-     * Constructor.
-     *
-     * @param topologyService Topology service.
-     * @param outgoingSnapshotsManager Snapshot manager.
-     * @param partition MV partition storage.
-     * @param catalogService Access to the Catalog.
-     * @param incomingSnapshotsExecutor Incoming snapshots executor.
-     * @see SnapshotMeta
-     */
-    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    /** Constructor. */
     public PartitionSnapshotStorageFactory(
+            PartitionKey partitionKey,
             TopologyService topologyService,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
-            PartitionAccess partition,
+            PartitionTxStateAccess txStateStorage,
             CatalogService catalogService,
             Executor incomingSnapshotsExecutor
     ) {
+        this.partitionKey = partitionKey;
         this.topologyService = topologyService;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
-        this.partition = partition;
+        this.txStateStorage = txStateStorage;
         this.catalogService = catalogService;
         this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
+    }
 
-        // We must choose the minimum applied index for local recovery so that 
we don't skip the raft commands for the storage with the
-        // lowest applied index and thus no data loss occurs.
-        long lastIncludedRaftIndex = partition.minLastAppliedIndex();
-        long lastIncludedRaftTerm = partition.minLastAppliedTerm();
-
-        int lastCatalogVersionAtStart = catalogService.latestCatalogVersion();
-
-        if (lastIncludedRaftIndex == 0) {
-            startupSnapshotMeta = null;
-        } else {
-            startupSnapshotMeta = snapshotMetaAt(
-                    lastIncludedRaftIndex,
-                    lastIncludedRaftTerm,
-                    
Objects.requireNonNull(partition.committedGroupConfiguration()),
-                    lastCatalogVersionAtStart,
-                    
collectNextRowIdToBuildIndexesAtStart(lastCatalogVersionAtStart),
-                    partition.leaseStartTime(),
-                    partition.primaryReplicaNodeId(),
-                    partition.primaryReplicaNodeName()
-            );
-        }
+    /**
+     * Adds a given table partition storage to the snapshot storage, managed 
by this factory.

Review Comment:
   Would it make sense to specify how this call should be made with respect to 
concurrency? Like, under which lock it should be done, or it's completely 
thread-safe, etc



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -490,29 +503,29 @@ private CompletableFuture<Void> 
completeRebalance(@Nullable Throwable throwable)
                     raftGroupConfig
             );
 
-            return 
partitionSnapshotStorage.partition().finishRebalance(RaftSnapshotPartitionMeta.fromSnapshotMeta(meta,
 raftGroupConfig));
+            return 
finishRebalance(RaftSnapshotPartitionMeta.fromSnapshotMeta(meta, 
raftGroupConfig));
         } finally {
             busyLock.leaveBusy();
         }
     }
 
     private int partId() {
-        return 
partitionSnapshotStorage.partition().partitionKey().partitionId();
+        return partitionSnapshotStorage.partitionKey().partitionId();
     }
 
     private String createPartitionInfo() {
-        return "tableId=" + 
partitionSnapshotStorage.partition().partitionKey().tableId() + ", 
partitionId=" + partId();
+        return partitionSnapshotStorage.partitionKey().toString();
     }
 
-    private void writeVersion(ResponseEntry entry, int i) {
+    private void writeVersion(PartitionSnapshotMeta snapshotMeta, 
ResponseEntry entry, int i) {

Review Comment:
   ```suggestion
       private void writeVersion(PartitionSnapshotMeta snapshotMeta, 
ResponseEntry entry, int entryIndex) {
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionStorageAccess.java:
##########
@@ -31,33 +30,17 @@
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
-import org.apache.ignite.internal.tx.TxMeta;
-import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Small abstractions for partition storages that includes only methods, 
mandatory for the snapshot storage.
  */
-public interface PartitionAccess {
-    /**
-     * Returns the key that uniquely identifies the corresponding partition.
-     */
-    PartitionKey partitionKey();
-
-    /**
-     * Creates a cursor to scan all meta of transactions.
-     *
-     * <p>All metas that exist at the time the method is called will be 
returned, in transaction ID order as an unsigned 128 bit integer.
-     */
-    Cursor<IgniteBiTuple<UUID, TxMeta>> getAllTxMeta();
+public interface PartitionStorageAccess {

Review Comment:
   ```suggestion
   public interface PartitionMvStorageAccess {
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -486,17 +509,16 @@ public boolean addRowIdToSkip(RowId rowId) {
      * @param rowId RowId.
      * @return {@code true} if the given RowId is already passed by the 
snapshot in normal rows sending order.
      */
-    public boolean alreadyPassed(RowId rowId) {
+    public boolean alreadyPassed(int tableId, RowId rowId) {
         assert mvOperationsLock.isLocked() : "MV operations lock must be 
acquired!";
 
-        if (!startedToReadMvPartition) {
-            return false;
-        }
         if (finishedMvData()) {
             return true;
         }
 
-        return rowId.compareTo(lastRowId) <= 0;
+        return partitionDeliveryState != null
+                && tableId <= partitionDeliveryState.currentTableId()
+                && rowId.compareTo(partitionDeliveryState.currentRowId()) <= 0;

Review Comment:
   Imagine we have 2 tables with IDs 1 and 2. We are currently sending the very 
beginning of table 2. `alreadyPassed(1, hugeRowId)` must return `true` (as we 
are already working on table 2), but this expression could evaluate to `false` 
(because `currentRowId()` might be `tinyRowId < hugeRowId`).
   
   Let's add a unit test for this



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -239,4 +298,22 @@ private static <T extends Comparable<T>> void 
updateTrackerIgnoringTrackerClosed
             // No-op.
         }
     }
+
+    private void cleanupSnapshots() {

Review Comment:
   This method (and a couple of other methods) are copy-pasted from 
`SnapshotAwarePartitionDataStorage`. Would it make sense to break 
`SnapshotAwarePartitionDataStorage` in two parts and reuse one part here?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java:
##########
@@ -46,84 +51,101 @@
  * {@code true}, and {@link SnapshotWriter#addFile(String)} throws an 
exception.
  */
 public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory 
{
+    private final PartitionKey partitionKey;
+
     /** Topology service. */
     private final TopologyService topologyService;
 
     /** Snapshot manager. */
     private final OutgoingSnapshotsManager outgoingSnapshotsManager;
 
-    /** Partition storage. */
-    private final PartitionAccess partition;
+    /**
+     * Partition storages grouped by table ID.
+     */
+    private final Int2ObjectMap<PartitionStorageAccess> partitionsByTableId = 
synchronize(new Int2ObjectOpenHashMap<>());
+
+    private final PartitionTxStateAccess txStateStorage;
 
     private final CatalogService catalogService;
 
-    private final @Nullable SnapshotMeta startupSnapshotMeta;
-
     /** Incoming snapshots executor. */
     private final Executor incomingSnapshotsExecutor;
 
-    /**
-     * Constructor.
-     *
-     * @param topologyService Topology service.
-     * @param outgoingSnapshotsManager Snapshot manager.
-     * @param partition MV partition storage.
-     * @param catalogService Access to the Catalog.
-     * @param incomingSnapshotsExecutor Incoming snapshots executor.
-     * @see SnapshotMeta
-     */
-    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    /** Constructor. */
     public PartitionSnapshotStorageFactory(
+            PartitionKey partitionKey,
             TopologyService topologyService,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
-            PartitionAccess partition,
+            PartitionTxStateAccess txStateStorage,
             CatalogService catalogService,
             Executor incomingSnapshotsExecutor
     ) {
+        this.partitionKey = partitionKey;
         this.topologyService = topologyService;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
-        this.partition = partition;
+        this.txStateStorage = txStateStorage;
         this.catalogService = catalogService;
         this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
+    }
 
-        // We must choose the minimum applied index for local recovery so that 
we don't skip the raft commands for the storage with the
-        // lowest applied index and thus no data loss occurs.
-        long lastIncludedRaftIndex = partition.minLastAppliedIndex();
-        long lastIncludedRaftTerm = partition.minLastAppliedTerm();
-
-        int lastCatalogVersionAtStart = catalogService.latestCatalogVersion();
-
-        if (lastIncludedRaftIndex == 0) {
-            startupSnapshotMeta = null;
-        } else {
-            startupSnapshotMeta = snapshotMetaAt(
-                    lastIncludedRaftIndex,
-                    lastIncludedRaftTerm,
-                    
Objects.requireNonNull(partition.committedGroupConfiguration()),
-                    lastCatalogVersionAtStart,
-                    
collectNextRowIdToBuildIndexesAtStart(lastCatalogVersionAtStart),
-                    partition.leaseStartTime(),
-                    partition.primaryReplicaNodeId(),
-                    partition.primaryReplicaNodeName()
-            );
-        }
+    /**
+     * Adds a given table partition storage to the snapshot storage, managed 
by this factory.
+     */
+    public void addMvPartition(int tableId, PartitionStorageAccess partition) {
+        PartitionStorageAccess prev = partitionsByTableId.put(tableId, 
partition);
+
+        assert prev == null : "Partition storage for table ID " + tableId + " 
already exists.";
+    }
+
+    public void removeMvPartition(int tableId) {
+        partitionsByTableId.remove(tableId);
     }
 
     @Override
     public @Nullable PartitionSnapshotStorage createSnapshotStorage(String 
uri, RaftOptions raftOptions) {
         return new PartitionSnapshotStorage(
+                partitionKey,
                 topologyService,
                 outgoingSnapshotsManager,
                 uri,
                 raftOptions,
-                partition,
+                partitionsByTableId,
+                txStateStorage,
                 catalogService,
-                startupSnapshotMeta,
+                createStartupSnapshotMeta(),
                 incomingSnapshotsExecutor
         );
     }
 
+    private @Nullable SnapshotMeta createStartupSnapshotMeta() {
+        // We must choose the minimum applied index for local recovery so that 
we don't skip the raft commands for the storage with the
+        // lowest applied index and thus no data loss occurs.
+        return partitionsByTableId.values().stream()
+                .min(comparingLong(PartitionStorageAccess::lastAppliedIndex))
+                .map(storageWithMinLastAppliedIndex -> {
+                    long minLastAppliedIndex = 
min(storageWithMinLastAppliedIndex.lastAppliedIndex(), 
txStateStorage.lastAppliedIndex());
+
+                    if (minLastAppliedIndex == 0) {
+                        return null;
+                    }
+
+                    int lastCatalogVersionAtStart = 
catalogService.latestCatalogVersion();
+
+                    return snapshotMetaAt(
+                            
min(storageWithMinLastAppliedIndex.lastAppliedIndex(), 
txStateStorage.lastAppliedIndex()),

Review Comment:
   ```suggestion
                               minLastAppliedIndex,
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java:
##########
@@ -46,84 +51,101 @@
  * {@code true}, and {@link SnapshotWriter#addFile(String)} throws an 
exception.
  */
 public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory 
{
+    private final PartitionKey partitionKey;
+
     /** Topology service. */
     private final TopologyService topologyService;
 
     /** Snapshot manager. */
     private final OutgoingSnapshotsManager outgoingSnapshotsManager;
 
-    /** Partition storage. */
-    private final PartitionAccess partition;
+    /**
+     * Partition storages grouped by table ID.
+     */
+    private final Int2ObjectMap<PartitionStorageAccess> partitionsByTableId = 
synchronize(new Int2ObjectOpenHashMap<>());
+
+    private final PartitionTxStateAccess txStateStorage;
 
     private final CatalogService catalogService;
 
-    private final @Nullable SnapshotMeta startupSnapshotMeta;
-
     /** Incoming snapshots executor. */
     private final Executor incomingSnapshotsExecutor;
 
-    /**
-     * Constructor.
-     *
-     * @param topologyService Topology service.
-     * @param outgoingSnapshotsManager Snapshot manager.
-     * @param partition MV partition storage.
-     * @param catalogService Access to the Catalog.
-     * @param incomingSnapshotsExecutor Incoming snapshots executor.
-     * @see SnapshotMeta
-     */
-    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    /** Constructor. */
     public PartitionSnapshotStorageFactory(
+            PartitionKey partitionKey,
             TopologyService topologyService,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
-            PartitionAccess partition,
+            PartitionTxStateAccess txStateStorage,
             CatalogService catalogService,
             Executor incomingSnapshotsExecutor
     ) {
+        this.partitionKey = partitionKey;
         this.topologyService = topologyService;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
-        this.partition = partition;
+        this.txStateStorage = txStateStorage;
         this.catalogService = catalogService;
         this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
+    }
 
-        // We must choose the minimum applied index for local recovery so that 
we don't skip the raft commands for the storage with the
-        // lowest applied index and thus no data loss occurs.
-        long lastIncludedRaftIndex = partition.minLastAppliedIndex();
-        long lastIncludedRaftTerm = partition.minLastAppliedTerm();
-
-        int lastCatalogVersionAtStart = catalogService.latestCatalogVersion();
-
-        if (lastIncludedRaftIndex == 0) {
-            startupSnapshotMeta = null;
-        } else {
-            startupSnapshotMeta = snapshotMetaAt(
-                    lastIncludedRaftIndex,
-                    lastIncludedRaftTerm,
-                    
Objects.requireNonNull(partition.committedGroupConfiguration()),
-                    lastCatalogVersionAtStart,
-                    
collectNextRowIdToBuildIndexesAtStart(lastCatalogVersionAtStart),
-                    partition.leaseStartTime(),
-                    partition.primaryReplicaNodeId(),
-                    partition.primaryReplicaNodeName()
-            );
-        }
+    /**
+     * Adds a given table partition storage to the snapshot storage, managed 
by this factory.
+     */
+    public void addMvPartition(int tableId, PartitionStorageAccess partition) {
+        PartitionStorageAccess prev = partitionsByTableId.put(tableId, 
partition);
+
+        assert prev == null : "Partition storage for table ID " + tableId + " 
already exists.";
+    }
+
+    public void removeMvPartition(int tableId) {

Review Comment:
   Not used anywhere yet. Will it be used when table removal from zone is 
implemented?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java:
##########
@@ -270,18 +278,25 @@ public void close() {
 
     @Override
     public SnapshotReader getReader() {
+        CompletableFuture<PartitionSnapshotMeta> snapshotMetaFuture = 
this.snapshotMetaFuture;
+
         // This one's called when "join" is complete.
-        return new IncomingSnapshotReader(snapshotMeta);
+        assert snapshotMetaFuture != null && snapshotMetaFuture.isDone();
+
+        // Use 'null' if we failed or were cancelled, this is what JRaft 
expects.
+        PartitionSnapshotMeta meta = 
snapshotMetaFuture.isCompletedExceptionally() ? null : 
snapshotMetaFuture.join();
+
+        return new IncomingSnapshotReader(meta);
     }
 
     private @Nullable ClusterNode getSnapshotSender(String nodeName) {
         return 
partitionSnapshotStorage.topologyService().getByConsistentId(nodeName);
     }
 
     /**
-     * Requests and saves the snapshot meta in {@link #snapshotMeta}.
+     * Requests and the snapshot meta.

Review Comment:
   The sentence seems to be broken



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java:
##########
@@ -53,6 +54,28 @@ void onConfigurationCommitted(
             long lastAppliedTerm
     );
 
+    /**
+     * Returns the last applied Raft log index.
+     */
+    long lastAppliedIndex();
+
+    /**
+     * Returns the last applied Raft term.

Review Comment:
   ```suggestion
        * Returns the term of the last applied Raft index.
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot;
+
+import java.util.Objects;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Uniquely identifies a partition. This is a pair of zone ID and partition 
number (aka partition ID).
+ */
+public class ZonePartitionKey implements PartitionKey {
+    private final int zoneId;
+
+    private final int partitionId;
+
+    /**
+     * Returns ID of the zone.
+     */
+    public int tableId() {

Review Comment:
   Is the name ok?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import java.util.Collection;
+import java.util.Iterator;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess;
+import org.apache.ignite.internal.storage.RowId;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Outgoing snapshot delivery state for a given partition.
+ */
+class PartitionDeliveryState {
+    private final Iterator<PartitionStorageAccess> partitionStoragesIterator;
+
+    /**
+     * Current row ID within the current partition storage. Can be {@code 
null} only if the snapshot has delivered all possible data.
+     */
+    @Nullable
+    private RowId currentRowId;
+
+    /**
+     * Current partition storage. Can be {@code null} only if the snapshot has 
delivered all possible data.
+     */
+    @Nullable
+    private PartitionStorageAccess currentPartitionStorage;
+
+    PartitionDeliveryState(Collection<PartitionStorageAccess> 
partitionStorages) {
+        this.partitionStoragesIterator = partitionStorages.iterator();
+
+        advance();
+    }
+
+    RowId currentRowId() {
+        assert currentRowId != null;
+
+        return currentRowId;
+    }
+
+    PartitionStorageAccess currentPartitionStorage() {
+        assert currentPartitionStorage != null;
+
+        return currentPartitionStorage;
+    }
+
+    int currentTableId() {
+        return currentPartitionStorage().tableId();
+    }
+
+    boolean isEmpty() {

Review Comment:
   ```suggestion
       boolean isExhausted() {
   ```
   Meaning that it *became* empty because we sent all data



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -162,30 +170,38 @@ void freezeScopeUnderMvLock() {
         try {
             frozenMeta = takeSnapshotMeta();
 
-            txDataCursor = partition.getAllTxMeta();
+            txDataCursor = txState.getAllTxMeta();
         } finally {
             releaseMvLock();
         }
     }
 
     private PartitionSnapshotMeta takeSnapshotMeta() {
-        RaftGroupConfiguration config = 
partition.committedGroupConfiguration();
+        PartitionStorageAccess partitionStorageWithMaxAppliedIndex = 
partitionsByTableId.values().stream()
+                .max(comparingLong(PartitionStorageAccess::lastAppliedIndex))
+                .orElseThrow();

Review Comment:
   How is it guaranteed that `partitionsByTableId` is not empty? What if a zone 
was created, but it doesn't contain any table yet?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import java.util.Collection;
+import java.util.Iterator;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess;
+import org.apache.ignite.internal.storage.RowId;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Outgoing snapshot delivery state for a given partition.
+ */
+class PartitionDeliveryState {
+    private final Iterator<PartitionStorageAccess> partitionStoragesIterator;
+
+    /**
+     * Current row ID within the current partition storage. Can be {@code 
null} only if the snapshot has delivered all possible data.
+     */
+    @Nullable
+    private RowId currentRowId;
+
+    /**
+     * Current partition storage. Can be {@code null} only if the snapshot has 
delivered all possible data.
+     */
+    @Nullable
+    private PartitionStorageAccess currentPartitionStorage;
+
+    PartitionDeliveryState(Collection<PartitionStorageAccess> 
partitionStorages) {
+        this.partitionStoragesIterator = partitionStorages.iterator();
+
+        advance();
+    }
+
+    RowId currentRowId() {
+        assert currentRowId != null;
+
+        return currentRowId;
+    }
+
+    PartitionStorageAccess currentPartitionStorage() {
+        assert currentPartitionStorage != null;
+
+        return currentPartitionStorage;
+    }
+
+    int currentTableId() {
+        return currentPartitionStorage().tableId();
+    }
+
+    boolean isEmpty() {
+        return currentPartitionStorage == null;
+    }
+
+    void advance() {
+        if (currentPartitionStorage == null) {
+            if (!partitionStoragesIterator.hasNext()) {
+                return;
+            }
+
+            currentPartitionStorage = partitionStoragesIterator.next();
+
+            currentRowId = 
currentPartitionStorage.closestRowId(RowId.lowestRowId(currentPartitionStorage.partitionId()));
+
+            // Partition is empty, try the next one.

Review Comment:
   It seems that the comment should be inside `if` body



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -304,23 +320,28 @@ private static long rowSizeInBytes(List<BinaryRowMessage> 
rowVersions) {
         return sum;
     }
 
-    private long 
tryProcessRowFromPartition(List<SnapshotMvDataResponse.ResponseEntry> batch, 
long totalBatchSize,
-            SnapshotMvDataRequest request) {
+    private long tryProcessRowFromPartition(
+            List<SnapshotMvDataResponse.ResponseEntry> batch,
+            long totalBatchSize,
+            SnapshotMvDataRequest request
+    ) {
         if (batchIsFull(request, totalBatchSize) || finishedMvData()) {
             return totalBatchSize;
         }
 
-        if (!startedToReadMvPartition) {
-            lastRowId = partition.closestRowId(lastRowId);
-
-            startedToReadMvPartition = true;
+        if (partitionDeliveryState == null) {
+            partitionDeliveryState = new 
PartitionDeliveryState(partitionsByTableId.values());
         } else {
-            lastRowId = partition.closestRowId(lastRowId.increment());
+            partitionDeliveryState.advance();
         }
 
-        if (!finishedMvData()) {
-            if (!rowIdsToSkip.remove(lastRowId)) {
-                SnapshotMvDataResponse.ResponseEntry rowEntry = 
rowEntry(lastRowId);
+        if (!partitionDeliveryState.isEmpty()) {

Review Comment:
   How about using `!finishedMvData()` instead (as it was)? It has the same 
effect, but it seems a bit more clear what we want to check in the condition 
(and also, this piece of code is the main reason the method was introduced)



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -554,10 +564,20 @@ private CompletableFuture<?> 
createZonePartitionReplicationNode(
                     txManager,
                     safeTimeTracker,
                     storageIndexTracker,
-                    zonePartitionId
+                    zonePartitionId,
+                    outgoingSnapshotsManager
             );
 
-            var listeners = new Listeners(raftGroupListener);
+            var snapshotStorageFactory = new PartitionSnapshotStorageFactory(
+                    new ZonePartitionKey(zonePartitionId.zoneId(), 
zonePartitionId.partitionId()),

Review Comment:
   Why do we need a key? Can't partition ID by itself play this role?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import java.util.Collection;
+import java.util.Iterator;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess;
+import org.apache.ignite.internal.storage.RowId;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Outgoing snapshot delivery state for a given partition.
+ */
+class PartitionDeliveryState {
+    private final Iterator<PartitionStorageAccess> partitionStoragesIterator;
+
+    /**
+     * Current row ID within the current partition storage. Can be {@code 
null} only if the snapshot has delivered all possible data.
+     */
+    @Nullable
+    private RowId currentRowId;
+
+    /**
+     * Current partition storage. Can be {@code null} only if the snapshot has 
delivered all possible data.
+     */
+    @Nullable
+    private PartitionStorageAccess currentPartitionStorage;
+
+    PartitionDeliveryState(Collection<PartitionStorageAccess> 
partitionStorages) {
+        this.partitionStoragesIterator = partitionStorages.iterator();
+
+        advance();
+    }
+
+    RowId currentRowId() {
+        assert currentRowId != null;
+
+        return currentRowId;
+    }
+
+    PartitionStorageAccess currentPartitionStorage() {
+        assert currentPartitionStorage != null;
+
+        return currentPartitionStorage;
+    }
+
+    int currentTableId() {
+        return currentPartitionStorage().tableId();
+    }
+
+    boolean isEmpty() {
+        return currentPartitionStorage == null;
+    }
+
+    void advance() {
+        if (currentPartitionStorage == null) {
+            if (!partitionStoragesIterator.hasNext()) {
+                return;
+            }
+
+            currentPartitionStorage = partitionStoragesIterator.next();
+
+            currentRowId = 
currentPartitionStorage.closestRowId(RowId.lowestRowId(currentPartitionStorage.partitionId()));
+
+            // Partition is empty, try the next one.
+            if (currentRowId == null) {
+                moveToNextPartitionStorage();
+            }
+        } else {
+            assert currentRowId != null;
+
+            RowId nextRowId = currentRowId.increment();
+
+            // We've exhausted all possible row IDs in the partition, switch 
to the next one.
+            if (nextRowId == null) {
+                moveToNextPartitionStorage();
+            } else {
+                currentRowId = currentPartitionStorage.closestRowId(nextRowId);
+
+                // We've read all data from this partition, switch to the next 
one.
+                if (currentRowId == null) {
+                    moveToNextPartitionStorage();
+                }
+            }
+        }
+    }
+
+    private void moveToNextPartitionStorage() {
+        currentPartitionStorage = null;
+
+        advance();

Review Comment:
   This creates a recursion. If we have a million of empty tables, we could 
blow up the stack



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -103,19 +113,10 @@ public class OutgoingSnapshot {
     private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData 
= new ArrayDeque<>();
 
     /**
-     * {@link RowId} used to point (most of the time) to the last processed 
row. More precisely:
-     *
-     * <ul>
-     *     <li>Before we started to read from the partition, this is equal to 
lowest theoretically possible
-     *     {@link RowId} for this partition</li>
-     *     <li>If we started to read from partition AND it is not yet 
exhausted, this is the last RowId that was
-     *     sent in this snapshot order</li>
-     *     <li>After we exhausted the partition, this is {@code null}</li>
-     * </ul>
+     * Current delivery state of partition data. Can be {@code null} only if 
the delivery has not started yet..
      */
-    private RowId lastRowId;
-
-    private boolean startedToReadMvPartition = false;
+    @Nullable
+    private PartitionDeliveryState partitionDeliveryState;

Review Comment:
   ```suggestion
       private PartitionDeliveryState partitionMvDeliveryState;
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -195,18 +228,44 @@ public void 
onConfigurationCommitted(RaftGroupConfiguration config, long lastApp
 
     @Override
     public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
-        // TODO: implement, see 
https://issues.apache.org/jira/browse/IGNITE-22416
-        throw new UnsupportedOperationException("Snapshotting is not 
implemented");

Review Comment:
   Could you please also port the comment from 
`PartitionListener#onSnapshotSave()`?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import java.util.Collection;
+import java.util.Iterator;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess;
+import org.apache.ignite.internal.storage.RowId;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Outgoing snapshot delivery state for a given partition.
+ */
+class PartitionDeliveryState {

Review Comment:
   ```suggestion
   class PartitionMvDeliveryState {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to