tkalkirill commented on code in PR #4528: URL: https://github.com/apache/ignite-3/pull/4528#discussion_r1792979542
########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java: ########## @@ -27,7 +27,7 @@ */ @Transferable(MetastorageCommandsMessageGroup.INVOKE) public interface InvokeCommand extends IdempotentCommand { Review Comment: Maybe we can shorten the Java method docs a bit and remove the unnecessary `@return`? ########## modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java: ########## @@ -242,7 +242,7 @@ public CompletableFuture<Boolean> write(Map<String, ? extends Serializable> newV return falseCompletedFuture(); } - Set<Operation> operations = new HashSet<>(); + List<Operation> operations = new ArrayList<>(); Review Comment: ```suggestion var operations = new ArrayList<Operation>(); ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -368,13 +384,25 @@ private void createDb() throws RocksDBException { rev = ByteUtils.bytesToLong(revision); } + checksum = new MetastorageChecksum(revision == null ? 0 : checksumByRevision(rev)); + byte[] compactionRevisionBytes = data.get(COMPACTION_REVISION_KEY); if (compactionRevisionBytes != null) { compactionRevision = ByteUtils.bytesToLong(compactionRevisionBytes); } } + private long checksumByRevision(long revision) throws RocksDBException { + byte[] bytes = revisionToChecksum.get(longToBytes(revision)); + + if (bytes == null) { + throw new CompactedException("Revision is compacted: " + revision); Review Comment: ```suggestion throw new CompactedException(revision); ``` ########## modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java: ########## @@ -293,7 +293,7 @@ public boolean invoke( boolean branch = condition.test(e.toArray(new Entry[]{})); Review Comment: The variable name is strange, of course. ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -191,6 +196,9 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { /** Revision to timestamp mapping column family. */ private volatile ColumnFamily revisionToTs; + /** Revision to checksum mapping column family. */ + private volatile ColumnFamily revisionToChecksum; Review Comment: Let's check how it will work with snapshots in tests: 1. org.apache.ignite.internal.metastorage.server.BasicOperationsKeyValueStorageTest#testSnapshot 2. org.apache.ignite.internal.metastorage.server.BasicOperationsKeyValueStorageTest#testClearDataBeforeRestoreFromSnapshot ########## modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java: ########## @@ -970,6 +970,11 @@ public long getCompactionRevision() { } } + @Override + public @Nullable Long checksum(long revision) { + throw new UnsupportedOperationException(); Review Comment: Let's implement the method to be as universal as possible. ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/InvokeCommand.java: ########## @@ -27,7 +27,7 @@ */ @Transferable(MetastorageCommandsMessageGroup.INVOKE) public interface InvokeCommand extends IdempotentCommand { - /** + /**b Review Comment: ```suggestion /** ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java: ########## @@ -375,4 +375,11 @@ boolean invoke( * @see #saveCompactionRevision(long) */ long getCompactionRevision(); + + /** + * Returns checksum corresponding to the revision, or {@code null} if the revision does not exist yet or was compacted. + * + * @param revision Revision. + */ + @Nullable Integer checksum(long revision); Review Comment: Why not throw `CompactedException` when requesting a compacted revision? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java: ########## @@ -375,4 +375,11 @@ boolean invoke( * @see #saveCompactionRevision(long) */ long getCompactionRevision(); + + /** + * Returns checksum corresponding to the revision, or {@code null} if the revision does not exist yet or was compacted. + * + * @param revision Revision. + */ + @Nullable Long checksum(long revision); Review Comment: By analogy with `KeyValueStorage#timestampByRevision`, I would throw `CompactedException` for now. ########## modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java: ########## @@ -17,13 +17,29 @@ package org.apache.ignite.internal.metastorage.server; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + import org.apache.ignite.internal.failure.NoOpFailureManager; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; +import org.junit.jupiter.api.Test; /** Compaction test for the RocksDB implementation of {@link KeyValueStorage}. */ public class RocksDbCompactionKeyValueStorageTest extends AbstractCompactionKeyValueStorageTest { @Override public KeyValueStorage createStorage() { return new RocksDbKeyValueStorage("test", workDir.resolve("storage"), new NoOpFailureManager()); } + + @Test + void checksumsAreRemovedForCompactedRevisions() { Review Comment: I would suggest adding a test for the compaction case to `org.apache.ignite.internal.metastorage.server.AbstractCompactionKeyValueStorageTest`. ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -207,6 +215,8 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { */ private long rev; + private MetastorageChecksum checksum; Review Comment: Specify that is guarded by the lock. ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -339,7 +353,7 @@ private DBOptions createDbOptions() { private void createDb() throws RocksDBException { List<ColumnFamilyDescriptor> descriptors = cfDescriptors(); - assert descriptors.size() == 4; + assert descriptors.size() == 5; Review Comment: ```suggestion assert descriptors.size() == 5 : descriptors.size(); ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -1606,6 +1670,21 @@ public long getCompactionRevision() { } } + @Override + public @Nullable Long checksum(long revision) { + rwLock.readLock().lock(); + + try { + byte[] checksumBytes = revisionToChecksum.get(longToBytes(revision)); + + return checksumBytes == null ? null : bytesToLong(checksumBytes); + } catch (RocksDBException e) { + throw new MetaStorageException(INTERNAL_ERR, "Cannot get checksum by revision " + revision, e); Review Comment: ```suggestion throw new MetaStorageException(INTERNAL_ERR, "Cannot get checksum by revision: " + revision, e); ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -510,31 +540,46 @@ private void updateKeysIndex(WriteBatch batch, byte[] key, long curRev) { * @param batch Write batch. * @param newRev New revision. * @param ts Operation's timestamp. + * @param newChecksum Checksum corresponding to the revision. * @throws RocksDBException If failed. */ - private void fillAndWriteBatch(WriteBatch batch, long newRev, @Nullable HybridTimestamp ts) throws RocksDBException { + private void completeAndWriteBatch(WriteBatch batch, long newRev, HybridTimestamp ts, long newChecksum) throws RocksDBException { byte[] revisionBytes = longToBytes(newRev); data.put(batch, REVISION_KEY, revisionBytes); - if (ts != null) { - byte[] tsBytes = hybridTsToArray(ts); + byte[] tsBytes = hybridTsToArray(ts); - tsToRevision.put(batch, tsBytes, revisionBytes); - revisionToTs.put(batch, revisionBytes, tsBytes); - } + tsToRevision.put(batch, tsBytes, revisionBytes); + revisionToTs.put(batch, revisionBytes, tsBytes); + + validateNoChecksumConflict(newRev, newChecksum); + revisionToChecksum.put(batch, revisionBytes, longToBytes(newChecksum)); db.write(defaultWriteOptions, batch); rev = newRev; - + checksum.commitRound(newChecksum); updatedEntries.ts = ts; queueWatchEvent(); notifyRevisionUpdate(); } + private void validateNoChecksumConflict(long newRev, long newChecksum) throws RocksDBException { + byte[] existingChecksumBytes = revisionToChecksum.get(longToBytes(newRev)); + + if (existingChecksumBytes != null) { + long existingChecksum = bytesToLong(existingChecksumBytes); + if (existingChecksum != newChecksum) { + throw new MetaStorageException(INTERNAL_ERR, "Metastorage revision checksum differs from a checksum for the same revision " Review Comment: I prefer to use `String#format`, it's up to you. ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -1625,6 +1704,22 @@ private void compactRevisionToTimestampAndViceVersa(long compactionRevision) thr revisionToTs.delete(batch, it.key()); tsToRevision.delete(batch, it.value()); + revisionToChecksum.delete(batch, it.key()); + + return true; + }); + } + + private void compactRevisionToChecksum(long compactionRevision) throws RocksDBException { Review Comment: Is this method used somewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org