ibessonov commented on code in PR #5670:
URL: https://github.com/apache/ignite-3/pull/5670#discussion_r2054582377


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -1434,88 +1378,202 @@ public void clear() {
             createDb();
         } catch (Exception e) {
             throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to 
restore snapshot", e);
-        } finally {
-            rwLock.readLock().unlock();
         }
     }
 
     private void compactKeys(long compactionRevision) throws RocksDBException {
-        compactInBatches(index, (key, value, batch) -> {
-            compactForKey(batch, key, getAsLongs(value.get()), 
compactionRevision);
-
-            return true;
-        });
-    }
+        assertCompactionRevisionLessThanCurrent(this.compactionRevision, rev);
 
-    private void compactAuxiliaryMappings(long compactionRevision) throws 
RocksDBException {
-        compactInBatches(revisionToTs, (key, value, batch) -> {
-            long revision = bytesToLong(key);
+        // Clear bloom filter before opening iterator, so that we don't have 
collisions right from the start.
+        synchronized (writeBatchProtector) {
+            writeBatchProtector.clear();
+        }
 
-            if (revision > compactionRevision) {
-                return false;
-            }
+        if (!busyLock.enterBusy()) {
+            return;
+        }
 
-            revisionToTs.delete(batch, key);
-            tsToRevision.delete(batch, value.get());
+        try (RocksIterator iterator = index.newIterator()) {
+            byte[] key = null;
+            List<byte[]> batchKeys = new ArrayList<>(COMPACT_BATCH_SIZE);
 
-            revisionToChecksum.delete(batch, key);
+            iterator.seekToFirst();
+            while (iterator.isValid()) {
+                try (WriteBatch batch = new WriteBatch()) {
+                    byte[] retryPositionKey = key;
 
-            return true;
-        });
-    }
+                    batchKeys.clear();
+                    for (int i = 0; i < COMPACT_BATCH_SIZE && 
iterator.isValid(); i++, iterator.next()) {
+                        if (stopCompaction.get()) {
+                            return;
+                        }
 
-    @FunctionalInterface
-    private interface CompactionAction {
-        /**
-         * Performs compaction on the storage at the current iterator pointer. 
Returns {@code true} if it is necessary to continue
-         * iterating, {@link false} if it is necessary to finish with writing 
the last batch.
-         */
-        boolean compact(byte[] key, Supplier<byte[]> value, WriteBatch batch) 
throws RocksDBException;
-    }
+                        iterator.status();
 
-    private void compactInBatches(ColumnFamily columnFamily, CompactionAction 
compactionAction) throws RocksDBException {
-        try (RocksIterator iterator = columnFamily.newIterator()) {
-            boolean continueIterating = true;
+                        key = iterator.key();
 
-            byte[] key = null;
+                        compactForKey(batch, batchKeys, key, 
getAsLongs(iterator.value()), compactionRevision);
+                    }
 
-            while (continueIterating) {
-                rwLock.writeLock().lock();
+                    if (!writeCompactedBatch(batchKeys, batch)) {
+                        key = retryPositionKey;
 
-                try (WriteBatch batch = new WriteBatch()) {
-                    // We must refresh the iterator while holding write lock, 
because iterator state might be outdated due to its snapshot
-                    // isolation.
-                    if (!refreshIterator(iterator, key)) {
-                        break;
+                        // Refreshing the iterator is absolutely crucial. We 
have determined that data has been modified externally,
+                        // current snapshot in the iterator is invalid.
+                        refreshIterator(iterator, key);
                     }
+                }
+            }
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private void compactAuxiliaryMappings(long compactionRevision) throws 
RocksDBException {
+        assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
 
-                    
assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try (RocksIterator iterator = revisionToTs.newIterator()) {
+            boolean continueIterating = true;
+            iterator.seekToFirst();
 
+            while (continueIterating && iterator.isValid()) {
+                try (WriteBatch batch = new WriteBatch()) {
                     for (int i = 0; i < COMPACT_BATCH_SIZE && 
iterator.isValid(); i++, iterator.next()) {
                         if (stopCompaction.get()) {
                             return;
                         }
 
-                        key = iterator.key();
+                        iterator.status();
 
-                        if (!compactionAction.compact(key, iterator::value, 
batch)) {
+                        if (!deleteAuxiliaryMapping(compactionRevision, 
iterator, batch)) {
                             continueIterating = false;
 
                             break;
                         }
                     }
 
-                    db.write(writeOptions, batch);
-                } finally {
-                    rwLock.writeLock().unlock();
+                    db.write(WRITE_OPTIONS, batch);
+                }
+            }
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private boolean deleteAuxiliaryMapping(long compactionRevision, 
RocksIterator iterator, WriteBatch batch) throws RocksDBException {
+        byte[] key = iterator.key();
+        long revision = bytesToLong(key);
+
+        if (revision > compactionRevision) {
+            return false;
+        }
+
+        revisionToTs.delete(batch, key);
+        tsToRevision.delete(batch, iterator.value());
+
+        revisionToChecksum.delete(batch, key);
+
+        return true;
+    }
+
+    private volatile long lastCompactRangeStart = System.nanoTime();

Review Comment:
   Oops. I'm actually going to delete the entirety of "compactRange" machinery 
right now, and maybe investigate this topic separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to