rpuch commented on code in PR #5670: URL: https://github.com/apache/ignite-3/pull/5670#discussion_r2053903348
########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java: ########## @@ -253,28 +200,22 @@ public long getCompactionRevision() { public void updateCompactionRevision(long compactionRevision, KeyValueUpdateContext context) { assert compactionRevision >= 0 : compactionRevision; - rwLock.writeLock().lock(); + assertCompactionRevisionLessThanCurrent(compactionRevision, rev); - try { - assertCompactionRevisionLessThanCurrent(compactionRevision, rev); + saveCompactionRevision(compactionRevision, context, false); - saveCompactionRevision(compactionRevision, context, false); + if (isInRecoveryState()) { + setCompactionRevision(compactionRevision); + } else if (compactionRevision > planedUpdateCompactionRevision) { Review Comment: ```suggestion } else if (compactionRevision > plannedUpdateCompactionRevision) { ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -262,12 +272,11 @@ private enum RecoveryStatus { /** * Write options used to write to RocksDB. * - * <p>Access is guarded by {@link #rwLock}. + * <p>Metastorage recovery is based on the snapshot & external log. WAL is never used for recovery, and can be safely disabled. */ - private WriteOptions writeOptions; + private static final WriteOptions WRITE_OPTIONS = new WriteOptions().setDisableWAL(true); Review Comment: If it's static, we cannot close it, which looks like a (small) resource leak ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -1004,7 +949,7 @@ public void compact(long revision) { // Compaction might have created a lot of tombstones in column families, which affect scan speed. Removing them makes next // compaction faster, as well as other scans in general. Review Comment: ```suggestion // Metastorage compaction faster, as well as other scans in general. ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -1004,7 +949,7 @@ public void compact(long revision) { // Compaction might have created a lot of tombstones in column families, which affect scan speed. Removing them makes next Review Comment: ```suggestion // Metastorage compaction might have created a lot of tombstones in column families, which affect scan speed. Removing them makes next ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -1434,88 +1378,202 @@ public void clear() { createDb(); } catch (Exception e) { throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to restore snapshot", e); - } finally { - rwLock.readLock().unlock(); } } private void compactKeys(long compactionRevision) throws RocksDBException { - compactInBatches(index, (key, value, batch) -> { - compactForKey(batch, key, getAsLongs(value.get()), compactionRevision); - - return true; - }); - } + assertCompactionRevisionLessThanCurrent(this.compactionRevision, rev); - private void compactAuxiliaryMappings(long compactionRevision) throws RocksDBException { - compactInBatches(revisionToTs, (key, value, batch) -> { - long revision = bytesToLong(key); + // Clear bloom filter before opening iterator, so that we don't have collisions right from the start. + synchronized (writeBatchProtector) { + writeBatchProtector.clear(); + } - if (revision > compactionRevision) { - return false; - } + if (!busyLock.enterBusy()) { + return; + } - revisionToTs.delete(batch, key); - tsToRevision.delete(batch, value.get()); + try (RocksIterator iterator = index.newIterator()) { + byte[] key = null; + List<byte[]> batchKeys = new ArrayList<>(COMPACT_BATCH_SIZE); - revisionToChecksum.delete(batch, key); + iterator.seekToFirst(); + while (iterator.isValid()) { + try (WriteBatch batch = new WriteBatch()) { + byte[] retryPositionKey = key; - return true; - }); - } + batchKeys.clear(); + for (int i = 0; i < COMPACT_BATCH_SIZE && iterator.isValid(); i++, iterator.next()) { + if (stopCompaction.get()) { + return; + } - @FunctionalInterface - private interface CompactionAction { - /** - * Performs compaction on the storage at the current iterator pointer. Returns {@code true} if it is necessary to continue - * iterating, {@link false} if it is necessary to finish with writing the last batch. - */ - boolean compact(byte[] key, Supplier<byte[]> value, WriteBatch batch) throws RocksDBException; - } + iterator.status(); - private void compactInBatches(ColumnFamily columnFamily, CompactionAction compactionAction) throws RocksDBException { - try (RocksIterator iterator = columnFamily.newIterator()) { - boolean continueIterating = true; + key = iterator.key(); - byte[] key = null; + compactForKey(batch, batchKeys, key, getAsLongs(iterator.value()), compactionRevision); + } - while (continueIterating) { - rwLock.writeLock().lock(); + if (!writeCompactedBatch(batchKeys, batch)) { + key = retryPositionKey; - try (WriteBatch batch = new WriteBatch()) { - // We must refresh the iterator while holding write lock, because iterator state might be outdated due to its snapshot - // isolation. - if (!refreshIterator(iterator, key)) { - break; + // Refreshing the iterator is absolutely crucial. We have determined that data has been modified externally, + // current snapshot in the iterator is invalid. + refreshIterator(iterator, key); } + } + } + } finally { + busyLock.leaveBusy(); + } + } + + private void compactAuxiliaryMappings(long compactionRevision) throws RocksDBException { + assertCompactionRevisionLessThanCurrent(compactionRevision, rev); - assertCompactionRevisionLessThanCurrent(compactionRevision, rev); + if (!busyLock.enterBusy()) { + return; + } + + try (RocksIterator iterator = revisionToTs.newIterator()) { + boolean continueIterating = true; + iterator.seekToFirst(); + while (continueIterating && iterator.isValid()) { + try (WriteBatch batch = new WriteBatch()) { for (int i = 0; i < COMPACT_BATCH_SIZE && iterator.isValid(); i++, iterator.next()) { if (stopCompaction.get()) { return; } - key = iterator.key(); + iterator.status(); - if (!compactionAction.compact(key, iterator::value, batch)) { + if (!deleteAuxiliaryMapping(compactionRevision, iterator, batch)) { continueIterating = false; break; } } - db.write(writeOptions, batch); - } finally { - rwLock.writeLock().unlock(); + db.write(WRITE_OPTIONS, batch); + } + } + } finally { + busyLock.leaveBusy(); + } + } + + private boolean deleteAuxiliaryMapping(long compactionRevision, RocksIterator iterator, WriteBatch batch) throws RocksDBException { + byte[] key = iterator.key(); + long revision = bytesToLong(key); + + if (revision > compactionRevision) { + return false; + } + + revisionToTs.delete(batch, key); + tsToRevision.delete(batch, iterator.value()); + + revisionToChecksum.delete(batch, key); + + return true; + } + + private volatile long lastCompactRangeStart = System.nanoTime(); + + private void compactColumnFamilies() throws RocksDBException { + if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - lastCompactRangeStart) >= 1) { + lastCompactRangeStart = System.nanoTime(); + + if (!busyLock.enterBusy()) { + return; + } + + try { + db.compactRange(); + } finally { + busyLock.leaveBusy(); + } + } + } + + /** + * Writes the batch to the database in the {@code FSM} thread. By the time this method is called, {@link #updatedEntries} must be in a + * state that corresponds to the current batch. This collection will be used to mark the entries as updated in + * {@link #writeBatchProtector}. + * + * <p>No other actions besides calling {@link WriteBatchProtector#onUpdate(byte[])} and {@link RocksDB#write(WriteOptions, WriteBatch)} + * are performed here. They're both executed while holding {@link #writeBatchProtector}'s monitor in order to synchronise + * {@link #writeBatchProtector} modifications. + * + * <p>Since there's a gap between reading the data and writing batch into the storage, it is possible that compaction thread had + * concurrently updated the data that we're modifying. The only real side effect of such a race will be a presence of already deleted + * revisions in revisions list, associated with any particular key (see {@link RocksDbKeyValueStorage#keyRevisionsForOperation(byte[])}. + * + * <p>We ignore this side effect, because retrying the operation in {@code FSM} thread is too expensive, and because there's really no + * harm in having some obsolete revisions there. We always keep in mind that a particular revision might already be compacted when we + * read data. + * + * @param batch RockDB's {@link WriteBatch}. + * @throws RocksDBException If {@link RocksDB#write(WriteOptions, WriteBatch)} threw an exception. + * @see #writeCompactedBatch(List, WriteBatch) + */ + private void writeBatch(WriteBatch batch) throws RocksDBException { + synchronized (writeBatchProtector) { + for (Entry updatedEntry : updatedEntries.updatedEntries) { + writeBatchProtector.onUpdate(updatedEntry.key()); + } + + //noinspection AccessToStaticFieldLockedOnInstance + db.write(WRITE_OPTIONS, batch); + } + } + + /** + * Writes the batch to the database in the compaction thread. + * + * <p>No other actions besides accessing {@link #writeBatchProtector} and calling {@link RocksDB#write(WriteOptions, WriteBatch)} + * are performed here. They're both executed while holding {@link #writeBatchProtector}'s monitor in order to synchronise potential + * {@link #writeBatchProtector} modifications. + * + * <p>Since there's a gap between reading the data and writing batch into the storage, it is possible that {@code FSM} thread had + * concurrently updated the data that we're compacting. Such a race would mean that there are unaccounted revisions in revisions list, + * associated with keys from {@code batchKeys} (see {@link RocksDbKeyValueStorage#keyRevisionsForOperation(byte[])}. + * + * <p>For that reason, unlike {@link #writeBatch(WriteBatch)}, here we don't have a privilege of just writing the batch in case of race. + * It would lead to data loss. In order to avoid it, we probabilistically check if data that we're compacting has been modified + * concurrently. If it certainly wasn't, we proceed and return {@code true}. If we're not sure, we abort this batch and return + * {@code false}. + * + * <p>If we return {@code false}, we also clear the {@link #writeBatchProtector} to avoid blocking the next batch. It is important to + * remember that we'll get false-negative results sometimes if we have hash collisions. That's fine. + * + * @param batchKeys Meta-storage keys that have been compacted in this batch. + * @param batch RockDB's {@link WriteBatch}. + * @return {@code true} if writing succeeded, {@code false} if compaction round must be retried doe to concurrent storage update. Review Comment: ```suggestion * @return {@code true} if writing succeeded, {@code false} if compaction round must be retried due to concurrent storage update. ``` ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java: ########## @@ -48,56 +50,82 @@ public class ReadOperationForCompactionTracker { private final AtomicLong longOperationIdGenerator = new AtomicLong(); - /** Generates the next read operation ID. Thread-safe. */ - public long generateReadOperationId() { - return longOperationIdGenerator.getAndIncrement(); + /** + * Token to stop tracking the read operation. + * + * @see #track(long, LongSupplier, LongSupplier) + */ + @FunctionalInterface + public interface TrackingToken extends AutoCloseable { + @Override + void close(); } /** * Starts tracking the completion of a read operation on its lowest estimation for revision upper bound. * - * <p>Method is expected not to be called more than once for the same arguments.</p> - * * <p>Expected usage pattern:</p> * <pre><code> - * Object readOperationId = ...; * int operationRevision = ...; * - * tracker.track(readOperationId, operationRevision); - * - * try { + * try (var token = tracker.track(operationRevision, storage::revision, storage::getCompactionRevision)) { Review Comment: And here it is used in a `try-with-resources`. I'm confused ########## modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java: ########## @@ -70,6 +73,8 @@ * Simple in-memory key/value storage for tests. */ public class SimpleInMemoryKeyValueStorage extends AbstractKeyValueStorage { + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); Review Comment: Why did you have to add this lock? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -229,8 +243,6 @@ public class RocksDbKeyValueStorage extends AbstractKeyValueStorage { /** * Facility to work with checksums. - * - * <p>Multi-threaded access is guarded by {@link #rwLock}.</p> */ private MetastorageChecksum checksum; Review Comment: Should it be made volatile? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -489,12 +490,7 @@ public void close() throws Exception { IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); - rwLock.writeLock().lock(); - try { - closeRocksResources(); - } finally { - rwLock.writeLock().unlock(); - } + closeRocksResources(); Review Comment: Is there a guarantee that start and stop will not coincide? Would it make sense to still have an RW lock, acquire write lock during start and stop, acquire read lock during 'normal' operations? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtector.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server.persistence; + +import java.util.Arrays; +import org.apache.ignite.internal.util.HashUtils; + +/** + * Simple bloom-filter-based utility class to be used to cooperate compaction thread with an FSM thread. For a full explanation please + * consider reading comments in the {@link RocksDbKeyValueStorage#compact(long)} implementation. + */ +class WriteBatchProtector { + /** + * Signifies that there are 2^14 bits in this bloom filter, which is roughly equal to 16 thousand. This is a reasonably large value. + */ + private static final int BLOOM_BITS = 14; + private static final int BLOOM_MASK = (1 << BLOOM_BITS) - 1; + + // Constants for least significant bits (LSB) of hash values. These bits would point to individual bit positions in "long" values. + private static final int LSB_BITS = Integer.numberOfTrailingZeros(Long.SIZE); + private static final int LSB_MASK = Long.SIZE - 1; + + /** Bit-set. */ + private final long[] bloom = new long[1 << BLOOM_BITS >>> LSB_BITS]; + + /** + * Called when {@code key} is updated in the storage. Immediate consecutive call of {@code maybeUpdated(key)} will return {@code true}. + */ + public void onUpdate(byte[] key) { + int h = hash(key); + + int msb = h >>> LSB_BITS; + int lsb = h & LSB_MASK; + bloom[msb] |= 1L << lsb; Review Comment: Would it make sense to extract expressions in these 3 lines to methods? Current duplication is tiny, but methods will make it explicit that expressions are same ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -1434,88 +1378,202 @@ public void clear() { createDb(); } catch (Exception e) { throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to restore snapshot", e); - } finally { - rwLock.readLock().unlock(); } } private void compactKeys(long compactionRevision) throws RocksDBException { - compactInBatches(index, (key, value, batch) -> { - compactForKey(batch, key, getAsLongs(value.get()), compactionRevision); - - return true; - }); - } + assertCompactionRevisionLessThanCurrent(this.compactionRevision, rev); - private void compactAuxiliaryMappings(long compactionRevision) throws RocksDBException { - compactInBatches(revisionToTs, (key, value, batch) -> { - long revision = bytesToLong(key); + // Clear bloom filter before opening iterator, so that we don't have collisions right from the start. + synchronized (writeBatchProtector) { + writeBatchProtector.clear(); + } - if (revision > compactionRevision) { - return false; - } + if (!busyLock.enterBusy()) { + return; + } - revisionToTs.delete(batch, key); - tsToRevision.delete(batch, value.get()); + try (RocksIterator iterator = index.newIterator()) { + byte[] key = null; + List<byte[]> batchKeys = new ArrayList<>(COMPACT_BATCH_SIZE); - revisionToChecksum.delete(batch, key); + iterator.seekToFirst(); + while (iterator.isValid()) { + try (WriteBatch batch = new WriteBatch()) { + byte[] retryPositionKey = key; - return true; - }); - } + batchKeys.clear(); + for (int i = 0; i < COMPACT_BATCH_SIZE && iterator.isValid(); i++, iterator.next()) { + if (stopCompaction.get()) { + return; + } - @FunctionalInterface - private interface CompactionAction { - /** - * Performs compaction on the storage at the current iterator pointer. Returns {@code true} if it is necessary to continue - * iterating, {@link false} if it is necessary to finish with writing the last batch. - */ - boolean compact(byte[] key, Supplier<byte[]> value, WriteBatch batch) throws RocksDBException; - } + iterator.status(); - private void compactInBatches(ColumnFamily columnFamily, CompactionAction compactionAction) throws RocksDBException { - try (RocksIterator iterator = columnFamily.newIterator()) { - boolean continueIterating = true; + key = iterator.key(); - byte[] key = null; + compactForKey(batch, batchKeys, key, getAsLongs(iterator.value()), compactionRevision); + } - while (continueIterating) { - rwLock.writeLock().lock(); + if (!writeCompactedBatch(batchKeys, batch)) { + key = retryPositionKey; - try (WriteBatch batch = new WriteBatch()) { - // We must refresh the iterator while holding write lock, because iterator state might be outdated due to its snapshot - // isolation. - if (!refreshIterator(iterator, key)) { - break; + // Refreshing the iterator is absolutely crucial. We have determined that data has been modified externally, + // current snapshot in the iterator is invalid. + refreshIterator(iterator, key); } + } + } + } finally { + busyLock.leaveBusy(); + } + } + + private void compactAuxiliaryMappings(long compactionRevision) throws RocksDBException { + assertCompactionRevisionLessThanCurrent(compactionRevision, rev); - assertCompactionRevisionLessThanCurrent(compactionRevision, rev); + if (!busyLock.enterBusy()) { + return; + } + + try (RocksIterator iterator = revisionToTs.newIterator()) { + boolean continueIterating = true; + iterator.seekToFirst(); + while (continueIterating && iterator.isValid()) { + try (WriteBatch batch = new WriteBatch()) { for (int i = 0; i < COMPACT_BATCH_SIZE && iterator.isValid(); i++, iterator.next()) { if (stopCompaction.get()) { return; } - key = iterator.key(); + iterator.status(); - if (!compactionAction.compact(key, iterator::value, batch)) { + if (!deleteAuxiliaryMapping(compactionRevision, iterator, batch)) { continueIterating = false; break; } } - db.write(writeOptions, batch); - } finally { - rwLock.writeLock().unlock(); + db.write(WRITE_OPTIONS, batch); + } + } + } finally { + busyLock.leaveBusy(); + } + } + + private boolean deleteAuxiliaryMapping(long compactionRevision, RocksIterator iterator, WriteBatch batch) throws RocksDBException { + byte[] key = iterator.key(); + long revision = bytesToLong(key); + + if (revision > compactionRevision) { + return false; + } + + revisionToTs.delete(batch, key); + tsToRevision.delete(batch, iterator.value()); + + revisionToChecksum.delete(batch, key); + + return true; + } + + private volatile long lastCompactRangeStart = System.nanoTime(); Review Comment: Please move this declaration to the beginning of the class (so that all fields are declared together) ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java: ########## @@ -48,56 +50,82 @@ public class ReadOperationForCompactionTracker { private final AtomicLong longOperationIdGenerator = new AtomicLong(); - /** Generates the next read operation ID. Thread-safe. */ - public long generateReadOperationId() { - return longOperationIdGenerator.getAndIncrement(); + /** + * Token to stop tracking the read operation. + * + * @see #track(long, LongSupplier, LongSupplier) + */ + @FunctionalInterface + public interface TrackingToken extends AutoCloseable { + @Override + void close(); } /** * Starts tracking the completion of a read operation on its lowest estimation for revision upper bound. * - * <p>Method is expected not to be called more than once for the same arguments.</p> - * * <p>Expected usage pattern:</p> * <pre><code> - * Object readOperationId = ...; * int operationRevision = ...; * - * tracker.track(readOperationId, operationRevision); - * - * try { + * try (var token = tracker.track(operationRevision, storage::revision, storage::getCompactionRevision)) { * doReadOperation(...); - * } finally { - * tracker.untrack(readOperationId, operationRevision); * } * </code></pre> * - * @see #untrack(Object, long) + * @see TrackingToken */ - public void track(Object readOperationId, long operationRevision) { - var key = new ReadOperationKey(readOperationId, operationRevision); + public TrackingToken track( + long operationRevision, LongSupplier latestRevision, LongSupplier compactedRevision + ) throws CompactedException { + long operationId = longOperationIdGenerator.getAndIncrement(); + + while (true) { + // "operationRevision" parameter is immutable, because we might need it on a next iteration. + // If it is asked to track the latest revision, we receive it right here from the corresponding supplier. + long currentOperationRevision = operationRevision == MetaStorageManager.LATEST_REVISION + ? latestRevision.getAsLong() + : operationRevision; + + // Value from compacted revision supplier can only grow. We only use it for upper bound checks, so it's safe to read it every + // time instead of caching it. It applies to all usages of the supplier. + if (currentOperationRevision <= compactedRevision.getAsLong()) { + // Latest revision can never be compacted. If for some reason it already is, we should retry until it is not. Review Comment: What could be the reason if it can never happen? Does this need a rephrasing? ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java: ########## @@ -411,49 +346,41 @@ private List<Entry> doGetAll(List<byte[]> keys, long revUpperBound) { @Override public void advanceSafeTime(KeyValueUpdateContext context) { - rwLock.writeLock().lock(); + setIndexAndTerm(context.index, context.term); - try { - setIndexAndTerm(context.index, context.term); - - if (areWatchesStarted()) { - watchProcessor.advanceSafeTime(() -> {}, context.timestamp); - } - } finally { - rwLock.writeLock().unlock(); + if (areWatchesStarted()) { + watchProcessor.advanceSafeTime(() -> {}, context.timestamp); } } @Override public Revisions revisions() { - rwLock.readLock().lock(); - - try { - return createCurrentRevisions(); - } finally { - rwLock.readLock().unlock(); - } + return createCurrentRevisions(); } private Revisions createCurrentRevisions() { return new Revisions(rev, compactionRevision); } protected void notifyWatchProcessor(NotifyWatchProcessorEvent event) { - if (areWatchesStarted()) { - event.notify(watchProcessor); - } else { - boolean added = notifyWatchProcessorEventsBeforeStartingWatches.add(event); + synchronized (watchProcessorMutex) { + if (areWatchesStarted()) { + event.notify(watchProcessor); + } else { + boolean added = notifyWatchProcessorEventsBeforeStartingWatches.add(event); - assert added : event; + assert added : event; + } } } - protected void drainNotifyWatchProcessorEventsBeforeStartingWatches() { - assert !areWatchesStarted(); + protected synchronized void drainNotifyWatchProcessorEventsBeforeStartingWatches() { Review Comment: Why is the method itself synchronized? ########## modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java: ########## @@ -959,6 +960,44 @@ void testConcurrentReadAndCompaction() { } } + @Test + void testConcurrentReadAndCompaction2() { Review Comment: Is it possible to invent a description of this test specifics in a couple of words instead of a number? Numbers don't tell anything ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -1434,88 +1378,202 @@ public void clear() { createDb(); } catch (Exception e) { throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to restore snapshot", e); - } finally { - rwLock.readLock().unlock(); } } private void compactKeys(long compactionRevision) throws RocksDBException { - compactInBatches(index, (key, value, batch) -> { - compactForKey(batch, key, getAsLongs(value.get()), compactionRevision); - - return true; - }); - } + assertCompactionRevisionLessThanCurrent(this.compactionRevision, rev); - private void compactAuxiliaryMappings(long compactionRevision) throws RocksDBException { - compactInBatches(revisionToTs, (key, value, batch) -> { - long revision = bytesToLong(key); + // Clear bloom filter before opening iterator, so that we don't have collisions right from the start. + synchronized (writeBatchProtector) { + writeBatchProtector.clear(); + } - if (revision > compactionRevision) { - return false; - } + if (!busyLock.enterBusy()) { + return; + } - revisionToTs.delete(batch, key); - tsToRevision.delete(batch, value.get()); + try (RocksIterator iterator = index.newIterator()) { + byte[] key = null; + List<byte[]> batchKeys = new ArrayList<>(COMPACT_BATCH_SIZE); - revisionToChecksum.delete(batch, key); + iterator.seekToFirst(); + while (iterator.isValid()) { + try (WriteBatch batch = new WriteBatch()) { + byte[] retryPositionKey = key; - return true; - }); - } + batchKeys.clear(); + for (int i = 0; i < COMPACT_BATCH_SIZE && iterator.isValid(); i++, iterator.next()) { + if (stopCompaction.get()) { + return; + } - @FunctionalInterface - private interface CompactionAction { - /** - * Performs compaction on the storage at the current iterator pointer. Returns {@code true} if it is necessary to continue - * iterating, {@link false} if it is necessary to finish with writing the last batch. - */ - boolean compact(byte[] key, Supplier<byte[]> value, WriteBatch batch) throws RocksDBException; - } + iterator.status(); Review Comment: Do we do this call to trigger an exception if status is not successful? If this is true, please add a comment about this ########## modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java: ########## @@ -1674,8 +1740,9 @@ protected Entry decodeEntry(byte[] key, byte[] keyRevisionsBytes) { long revision = keyRevisions[maxRevisionIndex]; Value value = getValueForOperationNullable(key, revision); - // Value may be null if the compaction has removed it in parallel. - if (value == null || value.tombstone()) { + assert value != null : "key=" + toUtf8String(key) + ", revision=" + revision + ", compactionRevision=" + compactionRevision; Review Comment: Why can't it be null here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org