sashapolo commented on code in PR #4530: URL: https://github.com/apache/ignite-3/pull/4530#discussion_r1798890695
########## modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java: ########## @@ -312,14 +314,22 @@ private List<Node> startNodes(int amount) { .map(addr -> ClusterServiceTestUtils.clusterService(testInfo, addr.port(), nodeFinder)) .forEach(clusterService -> nodes.add(new Node(clusterService, raftConfiguration, workDir))); + return nodes; + } + + /** + * Starts nodes. It is important to call {@link #prepareNodes(int)}, then configure required mocks, and only then star nodes. Otherwise Review Comment: ```suggestion * Starts nodes. It is important to call {@link #prepareNodes(int)}, then configure required mocks, and only then start nodes. Otherwise ``` ########## modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java: ########## @@ -312,14 +314,22 @@ private List<Node> startNodes(int amount) { .map(addr -> ClusterServiceTestUtils.clusterService(testInfo, addr.port(), nodeFinder)) .forEach(clusterService -> nodes.add(new Node(clusterService, raftConfiguration, workDir))); + return nodes; + } + + /** + * Starts nodes. It is important to call {@link #prepareNodes(int)}, then configure required mocks, and only then star nodes. Otherwise + * the {@link Node#mockStorage} (for example) will be concurrently used by a node and configured in the test (using a Review Comment: I think this description is a bit excessive. It would be sufficient to say that mocks must be configured *before* there will be any calls to them ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java: ########## @@ -220,6 +229,14 @@ public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) { @Override public boolean onSnapshotLoad(Path path) { + // Startup snapshot should always be ignored. Review Comment: Why? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java: ########## @@ -172,49 +178,55 @@ private void handleNonCachedWriteCommand(CommandClosure<WriteCommand> clo) { * * @param clo Command closure. * @param command Command. + * @param index Command index. + * @param term Command term. */ - private void handleWriteWithTime(CommandClosure<WriteCommand> clo, MetaStorageWriteCommand command) { + private void handleWriteWithTime(CommandClosure<WriteCommand> clo, MetaStorageWriteCommand command, long index, long term) { HybridTimestamp opTime = command.safeTime(); + KeyValueUpdateContext context = new KeyValueUpdateContext(index, term, opTime); + if (command instanceof PutCommand) { PutCommand putCmd = (PutCommand) command; - storage.put(toByteArray(putCmd.key()), toByteArray(putCmd.value()), opTime); + storage.put(toByteArray(putCmd.key()), toByteArray(putCmd.value()), context); clo.result(null); } else if (command instanceof PutAllCommand) { PutAllCommand putAllCmd = (PutAllCommand) command; - storage.putAll(toByteArrayList(putAllCmd.keys()), toByteArrayList(putAllCmd.values()), opTime); + storage.putAll(toByteArrayList(putAllCmd.keys()), toByteArrayList(putAllCmd.values()), context); clo.result(null); } else if (command instanceof RemoveCommand) { RemoveCommand rmvCmd = (RemoveCommand) command; - storage.remove(toByteArray(rmvCmd.key()), opTime); + storage.remove(toByteArray(rmvCmd.key()), context); clo.result(null); } else if (command instanceof RemoveAllCommand) { RemoveAllCommand rmvAllCmd = (RemoveAllCommand) command; - storage.removeAll(toByteArrayList(rmvAllCmd.keys()), opTime); + storage.removeAll(toByteArrayList(rmvAllCmd.keys()), context); clo.result(null); } else if (command instanceof InvokeCommand) { InvokeCommand cmd = (InvokeCommand) command; - clo.result(storage.invoke(toCondition(cmd.condition()), cmd.success(), cmd.failure(), opTime, cmd.id())); + clo.result(storage.invoke(toCondition(cmd.condition()), cmd.success(), cmd.failure(), context, cmd.id())); } else if (command instanceof MultiInvokeCommand) { MultiInvokeCommand cmd = (MultiInvokeCommand) command; - clo.result(storage.invoke(toIf(cmd.iif()), opTime, cmd.id())); + clo.result(storage.invoke(toIf(cmd.iif()), context, cmd.id())); } else if (command instanceof SyncTimeCommand) { - storage.advanceSafeTime(command.safeTime()); + storage.setIndexAndTerm(index, term); + + storage.advanceSafeTime(opTime); Review Comment: Would it make sense for the `advanceSafeTime` to also accept a context? Otherwise it's easy to forget that you need to update index and term manually ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/MetaStorageSnapshotStorageFactory.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.impl.raft; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.metastorage.server.KeyValueStorage; +import org.apache.ignite.internal.raft.IndexWithTerm; +import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.raft.jraft.RaftMessagesFactory; +import org.apache.ignite.raft.jraft.entity.RaftOutter; +import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta; +import org.apache.ignite.raft.jraft.option.RaftOptions; +import org.apache.ignite.raft.jraft.storage.SnapshotStorage; +import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader; +import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage; +import org.jetbrains.annotations.Nullable; + +/** + * Snapshot storage factory implementation for Meta Storage. + */ +public class MetaStorageSnapshotStorageFactory implements SnapshotStorageFactory { + private final KeyValueStorage storage; + + /** Snapshot meta, constructed from the storage data and raft group configuration at startup. {@code null} if the storage is empty. */ + private final @Nullable RaftOutter.SnapshotMeta startupSnapshotMeta; + + /** + * Constructor. + * + * @param storage Key-value storage instance. + */ + public MetaStorageSnapshotStorageFactory(KeyValueStorage storage) { + this.storage = storage; + + startupSnapshotMeta = readStartupSnapshotMeta(); + } + + private @Nullable SnapshotMeta readStartupSnapshotMeta() { + IndexWithTerm indexWithTerm = storage.getIndexWithTerm(); + + if (indexWithTerm == null) { + return null; + } + + byte[] configBytes = storage.getConfiguration(); + assert configBytes != null; + + RaftGroupConfiguration configuration = ByteUtils.fromBytes(configBytes); + assert configuration != null; + + return new RaftMessagesFactory().snapshotMeta() + .lastIncludedIndex(indexWithTerm.index()) + .lastIncludedTerm(indexWithTerm.term()) + .peersList(configuration.peers()) + .oldPeersList(configuration.oldPeers()) + .learnersList(configuration.learners()) + .oldLearnersList(configuration.oldLearners()) + .build(); + } + + /** + * Returns a snapshot meta, constructed from the storage data and raft group configuration at startup. + */ + SnapshotMeta startupSnapshotMeta() { + if (startupSnapshotMeta == null) { + throw new IllegalStateException("Storage is empty, so startup snapshot should not be read"); + } + + return startupSnapshotMeta; + } + + @Override + public @Nullable SnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions) { + return new LocalSnapshotStorage(uri, raftOptions) { + private final AtomicBoolean startupSnapshotOpened = new AtomicBoolean(false); + + @Override + public SnapshotReader open() { + if (startupSnapshotOpened.compareAndSet(false, true)) { + if (startupSnapshotMeta == null) { + // The storage is empty, let's behave how JRaft does: return null, avoiding an attempt to load a snapshot + // when it's not there. + return null; + } + + return new StartupMetaStorageSnapshotReader(MetaStorageSnapshotStorageFactory.this); Review Comment: This is so weird. Why don't we pass the `startupSnapshotMeta` here and instead have this circular reference? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/SnapshotStorageFactory.java: ########## @@ -28,7 +28,7 @@ public interface SnapshotStorageFactory { * * @param uri Snapshot URI. * @param raftOptions Raft options. - * @return Snapshot storage. + * @return Snapshot storage. {@code null} if there's no snapshot. */ SnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions); Review Comment: ```suggestion @Nullable SnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions); ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueUpdateContext.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server; + +import org.apache.ignite.internal.hlc.HybridTimestamp; + +/** + * Operation context for update operations in {@link KeyValueStorage}. Includes operation timestamp and a necessary metadata in terms of a Review Comment: ```suggestion * Operation context for update operations in {@link KeyValueStorage}. Includes operation timestamp and necessary metadata in terms of an ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -503,26 +522,91 @@ public long revision() { } @Override - public void put(byte[] key, byte[] value, HybridTimestamp opTs) { + public void put(byte[] key, byte[] value, KeyValueUpdateContext context) { rwLock.writeLock().lock(); try (WriteBatch batch = new WriteBatch()) { long newChecksum = checksum.wholePut(key, value); long curRev = rev + 1; - addDataToBatch(batch, key, value, curRev, opTs); + addDataToBatch(batch, key, value, curRev, context.timestamp); updateKeysIndex(batch, key, curRev); - completeAndWriteBatch(batch, curRev, opTs, newChecksum); + completeAndWriteBatch(batch, curRev, context, newChecksum); + } catch (RocksDBException e) { + throw new MetaStorageException(OP_EXECUTION_ERR, e); + } finally { + rwLock.writeLock().unlock(); + } + } + + @Override + public void setIndexAndTerm(long index, long term) { + rwLock.writeLock().lock(); + + try (WriteBatch batch = new WriteBatch()) { + data.put(batch, INDEX_AND_TERM_KEY, longsToBytes(0, index, term)); + + db.write(defaultWriteOptions, batch); + } catch (RocksDBException e) { + throw new MetaStorageException(OP_EXECUTION_ERR, e); + } finally { + rwLock.writeLock().unlock(); + } + } + + @Override + public @Nullable IndexWithTerm getIndexWithTerm() { + rwLock.readLock().lock(); + + try { + byte[] bytes = data.get(INDEX_AND_TERM_KEY); + + if (bytes == null) { + return null; + } + + long[] longs = getAsLongs(bytes); Review Comment: It's a bit strange that we need an intermediate array here. Can we simply read two longs into two variables? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueUpdateContext.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server; + +import org.apache.ignite.internal.hlc.HybridTimestamp; + +/** + * Operation context for update operations in {@link KeyValueStorage}. Includes operation timestamp and a necessary metadata in terms of a + * {@code long index} and {@code long term}. Review Comment: ```suggestion * {@code long index} and a {@code long term}. ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -1659,6 +1751,25 @@ public long checksum(long revision) { } } + @Override + public void clear() { Review Comment: Do we have any tests for this method? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/RaftGroupConfiguration.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.impl.raft; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; +import org.jetbrains.annotations.Nullable; + +/** + * A RAFT group configuration at which it was committed. + */ +public class RaftGroupConfiguration implements Serializable { Review Comment: I'm pretty sure I've seen an identical class somewhere... ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/raft/MetaStorageSnapshotStorageFactory.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.impl.raft; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.metastorage.server.KeyValueStorage; +import org.apache.ignite.internal.raft.IndexWithTerm; +import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.raft.jraft.RaftMessagesFactory; +import org.apache.ignite.raft.jraft.entity.RaftOutter; +import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta; +import org.apache.ignite.raft.jraft.option.RaftOptions; +import org.apache.ignite.raft.jraft.storage.SnapshotStorage; +import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader; +import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage; +import org.jetbrains.annotations.Nullable; + +/** + * Snapshot storage factory implementation for Meta Storage. + */ +public class MetaStorageSnapshotStorageFactory implements SnapshotStorageFactory { + private final KeyValueStorage storage; + + /** Snapshot meta, constructed from the storage data and raft group configuration at startup. {@code null} if the storage is empty. */ + private final @Nullable RaftOutter.SnapshotMeta startupSnapshotMeta; + + /** + * Constructor. + * + * @param storage Key-value storage instance. + */ + public MetaStorageSnapshotStorageFactory(KeyValueStorage storage) { + this.storage = storage; + + startupSnapshotMeta = readStartupSnapshotMeta(); + } + + private @Nullable SnapshotMeta readStartupSnapshotMeta() { + IndexWithTerm indexWithTerm = storage.getIndexWithTerm(); + + if (indexWithTerm == null) { + return null; + } + + byte[] configBytes = storage.getConfiguration(); + assert configBytes != null; + + RaftGroupConfiguration configuration = ByteUtils.fromBytes(configBytes); + assert configuration != null; + + return new RaftMessagesFactory().snapshotMeta() + .lastIncludedIndex(indexWithTerm.index()) + .lastIncludedTerm(indexWithTerm.term()) + .peersList(configuration.peers()) + .oldPeersList(configuration.oldPeers()) + .learnersList(configuration.learners()) + .oldLearnersList(configuration.oldLearners()) + .build(); + } + + /** + * Returns a snapshot meta, constructed from the storage data and raft group configuration at startup. + */ + SnapshotMeta startupSnapshotMeta() { + if (startupSnapshotMeta == null) { + throw new IllegalStateException("Storage is empty, so startup snapshot should not be read"); + } + + return startupSnapshotMeta; + } + + @Override + public @Nullable SnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions) { + return new LocalSnapshotStorage(uri, raftOptions) { + private final AtomicBoolean startupSnapshotOpened = new AtomicBoolean(false); + + @Override + public SnapshotReader open() { + if (startupSnapshotOpened.compareAndSet(false, true)) { + if (startupSnapshotMeta == null) { + // The storage is empty, let's behave how JRaft does: return null, avoiding an attempt to load a snapshot + // when it's not there. + return null; + } + + return new StartupMetaStorageSnapshotReader(MetaStorageSnapshotStorageFactory.this); Review Comment: Even more, I would expect the `StartupMetaStorageSnapshotReader` to read the startup meta and instead it is done by the factory for some reason -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org