swamirishi commented on code in PR #8214:
URL: https://github.com/apache/ozone/pull/8214#discussion_r2071511787


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java:
##########
@@ -1889,6 +1909,94 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB2() {
     );
   }
 
+
+  /**
+   * Test that backup SST files are pruned on loading previous compaction logs.
+   */
+  @EnabledIfSystemProperty(named = ROCKS_TOOLS_NATIVE_PROPERTY, matches = 
"true")

Review Comment:
   We need not check if rocks tools is loaded. We can just mock the raw sst 
file iterator construction. This test should be more of a unit test case.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1382,6 +1428,115 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    try (ManagedOptions managedOptions = new ManagedOptions();
+         ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+      Pair<byte[], CompactionLogEntry> compactionLogTableEntry;
+      int batchCounter = 0;
+      while ((compactionLogTableEntry = pruneQueue.poll()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
+        byte[] key = compactionLogTableEntry.getKey();
+        CompactionLogEntry compactionLogEntry = 
compactionLogTableEntry.getValue();
+        boolean shouldUpdateTable = false;

Review Comment:
   Lets break this loop altogether in the case of an exception.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1382,6 +1428,115 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    try (ManagedOptions managedOptions = new ManagedOptions();
+         ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+      Pair<byte[], CompactionLogEntry> compactionLogTableEntry;
+      int batchCounter = 0;
+      while ((compactionLogTableEntry = pruneQueue.poll()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
+        byte[] key = compactionLogTableEntry.getKey();
+        CompactionLogEntry compactionLogEntry = 
compactionLogTableEntry.getValue();
+        boolean shouldUpdateTable = false;
+        boolean shouldReprocess = false;
+        List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+        List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+        for (CompactionFileInfo fileInfo : fileInfoList) {
+          if (fileInfo.isPruned()) {
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          String sstFileName = fileInfo.getFileName();
+          Path sstFilePath = sstBackupDirPath.resolve(sstFileName + 
ROCKSDB_SST_SUFFIX);
+          if (Files.notExists(sstFilePath)) {
+            LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFilePath);
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+
+          // Write the file.sst => file.sst.tmp
+          Path prunedSSTFilePath = 
sstBackupDirPath.resolve(sstFilePath.toFile().getName() + ".tmp");
+          try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = 
new ManagedRawSSTFileReader<>(
+                   managedOptions, sstFilePath.toFile().getAbsolutePath(), 
SST_READ_AHEAD_SIZE);
+               ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = 
sstFileReader.newIterator(
+                   keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), 
null, null)) {
+            Files.deleteIfExists(prunedSSTFilePath);
+            sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+            while (itr.hasNext()) {
+              Pair<byte[], Integer> keyValue = itr.next();
+              if (keyValue.getValue() == 0) {
+                sstFileWriter.delete(keyValue.getKey());
+              } else {
+                sstFileWriter.put(keyValue.getKey(), new byte[0]);
+              }
+            }
+          } catch (IOException | RocksDBException e) {
+            shouldReprocess = true;
+            try {
+              Files.deleteIfExists(prunedSSTFilePath);
+            } catch (IOException ex) { }
+            continue;
+          } finally {
+            try {
+              sstFileWriter.finish();
+            } catch (RocksDBException e) {
+              throw new RuntimeException("Failed to finish writing to " + 
prunedSSTFilePath, e);
+            }
+          }
+
+          // Move file.sst.tmp to file.sst and replace existing file atomically
+          try (BootstrapStateHandler.Lock lock = 
getBootstrapStateLock().lock()) {
+            Files.move(prunedSSTFilePath, sstFilePath,
+                StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING);
+          } catch (IOException | InterruptedException e) {
+            shouldReprocess = true;
+            continue;
+          } finally {
+            try {
+              Files.deleteIfExists(prunedSSTFilePath);

Review Comment:
   How about having the same path for all the tmp files and just deleting it at 
the start of the sstFileWriter instead of having this rollback finally block in 
multiple places?
   Something like "pruned.sst.tmp" 



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1382,6 +1428,115 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    try (ManagedOptions managedOptions = new ManagedOptions();
+         ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+      Pair<byte[], CompactionLogEntry> compactionLogTableEntry;
+      int batchCounter = 0;
+      while ((compactionLogTableEntry = pruneQueue.poll()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
+        byte[] key = compactionLogTableEntry.getKey();
+        CompactionLogEntry compactionLogEntry = 
compactionLogTableEntry.getValue();
+        boolean shouldUpdateTable = false;
+        boolean shouldReprocess = false;
+        List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+        List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+        for (CompactionFileInfo fileInfo : fileInfoList) {
+          if (fileInfo.isPruned()) {
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          String sstFileName = fileInfo.getFileName();
+          Path sstFilePath = sstBackupDirPath.resolve(sstFileName + 
ROCKSDB_SST_SUFFIX);
+          if (Files.notExists(sstFilePath)) {
+            LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFilePath);
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+
+          // Write the file.sst => file.sst.tmp
+          Path prunedSSTFilePath = 
sstBackupDirPath.resolve(sstFilePath.toFile().getName() + ".tmp");
+          try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = 
new ManagedRawSSTFileReader<>(
+                   managedOptions, sstFilePath.toFile().getAbsolutePath(), 
SST_READ_AHEAD_SIZE);
+               ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = 
sstFileReader.newIterator(
+                   keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), 
null, null)) {
+            Files.deleteIfExists(prunedSSTFilePath);
+            sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+            while (itr.hasNext()) {
+              Pair<byte[], Integer> keyValue = itr.next();
+              if (keyValue.getValue() == 0) {
+                sstFileWriter.delete(keyValue.getKey());
+              } else {
+                sstFileWriter.put(keyValue.getKey(), new byte[0]);
+              }
+            }
+          } catch (IOException | RocksDBException e) {
+            shouldReprocess = true;
+            try {
+              Files.deleteIfExists(prunedSSTFilePath);
+            } catch (IOException ex) { }
+            continue;
+          } finally {
+            try {
+              sstFileWriter.finish();
+            } catch (RocksDBException e) {
+              throw new RuntimeException("Failed to finish writing to " + 
prunedSSTFilePath, e);

Review Comment:
   https://github.com/apache/ozone/pull/8277 



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -163,14 +175,18 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
   private final long maxAllowedTimeInDag;
   private final BootstrapStateHandler.Lock lock
       = new BootstrapStateHandler.Lock();
-
+  private static final int SST_READ_AHEAD_SIZE = 2 * 1024 * 1024;
+  private int pruneSSTFileBatchSize;
   private ColumnFamilyHandle snapshotInfoTableCFHandle;
   private static final String DAG_PRUNING_SERVICE_NAME = 
"CompactionDagPruningService";
   private AtomicBoolean suspended;
 
   private ColumnFamilyHandle compactionLogTableCFHandle;
   private ManagedRocksDB activeRocksDB;
   private ConcurrentMap<String, CompactionFileInfo> inflightCompactions;
+  private boolean isNativeLibsLoaded = false;
+  private ConcurrentLinkedQueue<Pair<byte[], CompactionLogEntry>> pruneQueue =

Review Comment:
   Keep this null or Optional.empty() by default and only initialize if the 
prune thread is going to run.



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java:
##########
@@ -18,6 +18,8 @@
 package org.apache.hadoop.ozone.om.snapshot;
 
 import static 
org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB;

Review Comment:
   Why this change?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -22,6 +22,8 @@
 import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.DELETE;
 import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.MODIFY;
 import static 
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.RENAME;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB;

Review Comment:
   Why is this change required?



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1382,6 +1428,115 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    try (ManagedOptions managedOptions = new ManagedOptions();
+         ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+      Pair<byte[], CompactionLogEntry> compactionLogTableEntry;
+      int batchCounter = 0;
+      while ((compactionLogTableEntry = pruneQueue.poll()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
+        byte[] key = compactionLogTableEntry.getKey();
+        CompactionLogEntry compactionLogEntry = 
compactionLogTableEntry.getValue();
+        boolean shouldUpdateTable = false;
+        boolean shouldReprocess = false;
+        List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+        List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+        for (CompactionFileInfo fileInfo : fileInfoList) {
+          if (fileInfo.isPruned()) {
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          String sstFileName = fileInfo.getFileName();
+          Path sstFilePath = sstBackupDirPath.resolve(sstFileName + 
ROCKSDB_SST_SUFFIX);
+          if (Files.notExists(sstFilePath)) {
+            LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFilePath);
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+
+          // Write the file.sst => file.sst.tmp
+          Path prunedSSTFilePath = 
sstBackupDirPath.resolve(sstFilePath.toFile().getName() + ".tmp");
+          try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = 
new ManagedRawSSTFileReader<>(
+                   managedOptions, sstFilePath.toFile().getAbsolutePath(), 
SST_READ_AHEAD_SIZE);
+               ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = 
sstFileReader.newIterator(
+                   keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), 
null, null)) {
+            Files.deleteIfExists(prunedSSTFilePath);
+            sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+            while (itr.hasNext()) {
+              Pair<byte[], Integer> keyValue = itr.next();
+              if (keyValue.getValue() == 0) {
+                sstFileWriter.delete(keyValue.getKey());
+              } else {
+                sstFileWriter.put(keyValue.getKey(), new byte[0]);
+              }
+            }
+          } catch (IOException | RocksDBException e) {
+            shouldReprocess = true;
+            try {
+              Files.deleteIfExists(prunedSSTFilePath);
+            } catch (IOException ex) { }
+            continue;
+          } finally {
+            try {
+              sstFileWriter.finish();
+            } catch (RocksDBException e) {
+              throw new RuntimeException("Failed to finish writing to " + 
prunedSSTFilePath, e);

Review Comment:
   Why runtime exception? Convert RocksdbException into RocksDatabaseException.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -526,12 +562,17 @@ public void onCompactionCompleted(RocksDB db,
             inflightCompactions.remove(inputFile);
           }
         }
+        // Add the compaction log entry to the prune queue
+        // so that the backup input sst files can be pruned.
+        if (isNativeLibsLoaded) {
+          pruneQueue.offer(Pair.of(key, compactionLogEntry));

Review Comment:
   Instead of a pair just add the key to the queue. We need not keep the 
compactionLogEntry in memory



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -526,12 +562,17 @@ public void onCompactionCompleted(RocksDB db,
             inflightCompactions.remove(inputFile);
           }
         }
+        // Add the compaction log entry to the prune queue
+        // so that the backup input sst files can be pruned.
+        if (isNativeLibsLoaded) {

Review Comment:
   I don't like this logic present in multiple places. We should just check 
pruneQueue != null



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1382,6 +1428,115 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    try (ManagedOptions managedOptions = new ManagedOptions();
+         ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+      Pair<byte[], CompactionLogEntry> compactionLogTableEntry;
+      int batchCounter = 0;
+      while ((compactionLogTableEntry = pruneQueue.poll()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
+        byte[] key = compactionLogTableEntry.getKey();
+        CompactionLogEntry compactionLogEntry = 
compactionLogTableEntry.getValue();
+        boolean shouldUpdateTable = false;
+        boolean shouldReprocess = false;
+        List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+        List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+        for (CompactionFileInfo fileInfo : fileInfoList) {
+          if (fileInfo.isPruned()) {
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          String sstFileName = fileInfo.getFileName();
+          Path sstFilePath = sstBackupDirPath.resolve(sstFileName + 
ROCKSDB_SST_SUFFIX);
+          if (Files.notExists(sstFilePath)) {
+            LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFilePath);
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+
+          // Write the file.sst => file.sst.tmp
+          Path prunedSSTFilePath = 
sstBackupDirPath.resolve(sstFilePath.toFile().getName() + ".tmp");
+          try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = 
new ManagedRawSSTFileReader<>(
+                   managedOptions, sstFilePath.toFile().getAbsolutePath(), 
SST_READ_AHEAD_SIZE);
+               ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = 
sstFileReader.newIterator(
+                   keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), 
null, null)) {
+            Files.deleteIfExists(prunedSSTFilePath);
+            sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+            while (itr.hasNext()) {
+              Pair<byte[], Integer> keyValue = itr.next();
+              if (keyValue.getValue() == 0) {
+                sstFileWriter.delete(keyValue.getKey());
+              } else {
+                sstFileWriter.put(keyValue.getKey(), new byte[0]);
+              }
+            }
+          } catch (IOException | RocksDBException e) {
+            shouldReprocess = true;
+            try {
+              Files.deleteIfExists(prunedSSTFilePath);
+            } catch (IOException ex) { }
+            continue;
+          } finally {
+            try {
+              sstFileWriter.finish();
+            } catch (RocksDBException e) {
+              throw new RuntimeException("Failed to finish writing to " + 
prunedSSTFilePath, e);
+            }
+          }
+
+          // Move file.sst.tmp to file.sst and replace existing file atomically
+          try (BootstrapStateHandler.Lock lock = 
getBootstrapStateLock().lock()) {
+            Files.move(prunedSSTFilePath, sstFilePath,
+                StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING);
+          } catch (IOException | InterruptedException e) {
+            shouldReprocess = true;
+            continue;
+          } finally {
+            try {
+              Files.deleteIfExists(prunedSSTFilePath);

Review Comment:
   Let us just break the entire thread run if an exception is thrown. This 
exception handling logic is convoluting the piece of code which IMO is 
unnecessary.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1382,6 +1428,115 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    try (ManagedOptions managedOptions = new ManagedOptions();
+         ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+      Pair<byte[], CompactionLogEntry> compactionLogTableEntry;
+      int batchCounter = 0;
+      while ((compactionLogTableEntry = pruneQueue.poll()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
+        byte[] key = compactionLogTableEntry.getKey();
+        CompactionLogEntry compactionLogEntry = 
compactionLogTableEntry.getValue();
+        boolean shouldUpdateTable = false;
+        boolean shouldReprocess = false;

Review Comment:
   How about just peeking and not polling at the start? We wouldn't need this 
reprocess boolean value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to