hemantk-12 commented on code in PR #8214:
URL: https://github.com/apache/ozone/pull/8214#discussion_r2053331834


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -237,8 +256,22 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
           this::pruneSstFiles,
           pruneCompactionDagDaemonRunIntervalInMs,
           pruneCompactionDagDaemonRunIntervalInMs,
-          TimeUnit.MILLISECONDS
-      );
+          TimeUnit.MILLISECONDS);
+
+      try {
+        isNativeLibsLoaded =  ManagedRawSSTFileReader.loadLibrary();

Review Comment:
   1. Add a check if `ozone.om.snapshot.load.native.lib` is enabled.
   2. Can we load `ManagedRawSSTFileReader.loadLibrary()`? Because it is 
initialized in 
[SnapshotDiffManager#initNativeLibraryForEfficientDiff](https://github.com/apache/ozone/blob/5c91b44ad1d6e9c61073c3718cd62929b40ba2c0/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java#L287).



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -843,9 +889,16 @@ public synchronized Optional<List<String>> 
getSSTDiffListWithFullPath(DifferSnap
               String sstFullPath = getSSTFullPath(sst, src.getDbPath(), 
dest.getDbPath());
               Path link = Paths.get(sstFilesDirForSnapDiffJob,
                   sst + SST_FILE_EXTENSION);
-              Path srcFile = Paths.get(sstFullPath);
-              createLink(link, srcFile);
-              return link.toString();
+              // Take a read lock on the SST FILE
+              ReentrantReadWriteLock.ReadLock sstReadLock = 
getSSTFileLock(sstFullPath).readLock();

Review Comment:
   IMO, we don't need file-level granularity lock. We can define a simple lock 
in RocksDBCheckpointDiffer and use that.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1382,6 +1445,113 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    try (ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedOptions managedOptions = new ManagedOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+
+      Pair<byte[], CompactionLogEntry> compactionLogTableEntry;
+      while ((compactionLogTableEntry = pruneQueue.poll()) != null) {
+
+        byte[] key = compactionLogTableEntry.getKey();
+        CompactionLogEntry compactionLogEntry = 
compactionLogTableEntry.getValue();
+        boolean shouldUpdateTable = false;
+        List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+        List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+        for (CompactionFileInfo fileInfo : fileInfoList) {
+          if (fileInfo.isPruned()) {
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          String sstFileName = fileInfo.getFileName();
+          if (compactionNodeMap.get(sstFileName).isPruned()) {
+            shouldUpdateTable = true;
+            fileInfo.setPruned();
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          File sstFile = sstBackupDirPath.resolve(sstFileName + 
ROCKSDB_SST_SUFFIX).toFile();
+          if (Files.notExists(sstFile.toPath())) {
+            LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFile);
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+
+          // Write the file.sst => file.sst.tmp
+          File prunedSSTFile = 
Files.createFile(sstBackupDirPath.resolve(sstFile.getName() + ".tmp")).toFile();

Review Comment:
   `Files.createFile` will throw `FileAlreadyExistsException` if the file 
already exists, which is possible if there was a failure in the previous run. 
Handle the file exists scenario. 



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1382,6 +1445,113 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    try (ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedOptions managedOptions = new ManagedOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+
+      Pair<byte[], CompactionLogEntry> compactionLogTableEntry;
+      while ((compactionLogTableEntry = pruneQueue.poll()) != null) {

Review Comment:
   We should loop this to max to the size of the queue that is used to 
initialize the queue. 



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1382,6 +1445,113 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    try (ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedOptions managedOptions = new ManagedOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+
+      Pair<byte[], CompactionLogEntry> compactionLogTableEntry;
+      while ((compactionLogTableEntry = pruneQueue.poll()) != null) {
+
+        byte[] key = compactionLogTableEntry.getKey();
+        CompactionLogEntry compactionLogEntry = 
compactionLogTableEntry.getValue();
+        boolean shouldUpdateTable = false;
+        List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+        List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+        for (CompactionFileInfo fileInfo : fileInfoList) {
+          if (fileInfo.isPruned()) {
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          String sstFileName = fileInfo.getFileName();
+          if (compactionNodeMap.get(sstFileName).isPruned()) {
+            shouldUpdateTable = true;
+            fileInfo.setPruned();
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          File sstFile = sstBackupDirPath.resolve(sstFileName + 
ROCKSDB_SST_SUFFIX).toFile();
+          if (Files.notExists(sstFile.toPath())) {
+            LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFile);
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+
+          // Write the file.sst => file.sst.tmp
+          File prunedSSTFile = 
Files.createFile(sstBackupDirPath.resolve(sstFile.getName() + ".tmp")).toFile();
+          ReentrantReadWriteLock.WriteLock sstWriteLock = 
getSSTFileLock(sstFile.getAbsolutePath()).writeLock();
+          try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = 
new ManagedRawSSTFileReader<>(
+                   managedOptions, sstFile.getAbsolutePath(), 
SST_READ_AHEAD_SIZE);
+               ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = 
sstFileReader.newIterator(
+                   keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), 
null, null)) {
+
+            sstFileWriter.open(prunedSSTFile.getAbsolutePath());
+            while (itr.hasNext()) {
+              Pair<byte[], Integer> keyValue = itr.next();
+              if (keyValue.getValue() == 0) {
+                sstFileWriter.delete(keyValue.getKey());
+              } else {
+                sstFileWriter.put(keyValue.getKey(), new byte[0]);
+              }
+            }
+            sstFileWriter.finish();
+
+            // Take a write lock on the SST File
+            sstWriteLock.lock();
+            // Move file.sst.tmp to file.sst and replace existing file 
atomically
+            Files.move(prunedSSTFile.toPath(), sstFile.toPath(),
+                StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING);
+          } catch (Exception e) {
+            pruneQueue.put(compactionLogTableEntry);
+            LOG.error("Could not prune OMKeyInfo from {}. Reprocessing.", 
sstFileName, e);
+            continue;
+          } finally {
+            Files.deleteIfExists(prunedSSTFile.toPath());
+            if (sstWriteLock.isHeldByCurrentThread()) {
+              sstWriteLock.unlock();
+            }
+          }
+
+          // Put pruned flag in compaction DAG. Update CompactionNode in Map.
+          compactionNodeMap.get(sstFileName).setPruned();

Review Comment:
   Why do we need to update the `compactionNode`? A file will be pruned only 
once during the life cycle of the process, either added by the compaction 
listener or the OM start-up process.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1382,6 +1445,113 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    try (ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedOptions managedOptions = new ManagedOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+
+      Pair<byte[], CompactionLogEntry> compactionLogTableEntry;
+      while ((compactionLogTableEntry = pruneQueue.poll()) != null) {
+
+        byte[] key = compactionLogTableEntry.getKey();
+        CompactionLogEntry compactionLogEntry = 
compactionLogTableEntry.getValue();
+        boolean shouldUpdateTable = false;
+        List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+        List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+        for (CompactionFileInfo fileInfo : fileInfoList) {
+          if (fileInfo.isPruned()) {
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          String sstFileName = fileInfo.getFileName();
+          if (compactionNodeMap.get(sstFileName).isPruned()) {
+            shouldUpdateTable = true;
+            fileInfo.setPruned();
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          File sstFile = sstBackupDirPath.resolve(sstFileName + 
ROCKSDB_SST_SUFFIX).toFile();
+          if (Files.notExists(sstFile.toPath())) {
+            LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFile);
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+
+          // Write the file.sst => file.sst.tmp
+          File prunedSSTFile = 
Files.createFile(sstBackupDirPath.resolve(sstFile.getName() + ".tmp")).toFile();
+          ReentrantReadWriteLock.WriteLock sstWriteLock = 
getSSTFileLock(sstFile.getAbsolutePath()).writeLock();
+          try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = 
new ManagedRawSSTFileReader<>(
+                   managedOptions, sstFile.getAbsolutePath(), 
SST_READ_AHEAD_SIZE);
+               ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = 
sstFileReader.newIterator(
+                   keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), 
null, null)) {
+
+            sstFileWriter.open(prunedSSTFile.getAbsolutePath());
+            while (itr.hasNext()) {
+              Pair<byte[], Integer> keyValue = itr.next();
+              if (keyValue.getValue() == 0) {
+                sstFileWriter.delete(keyValue.getKey());
+              } else {
+                sstFileWriter.put(keyValue.getKey(), new byte[0]);
+              }
+            }
+            sstFileWriter.finish();
+
+            // Take a write lock on the SST File

Review Comment:
   nit: no need to add this comment or the next one. 



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -171,6 +185,9 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
   private ColumnFamilyHandle compactionLogTableCFHandle;
   private ManagedRocksDB activeRocksDB;
   private ConcurrentMap<String, CompactionFileInfo> inflightCompactions;
+  private boolean isNativeLibsLoaded;
+  private BlockingQueue<Pair<byte[], CompactionLogEntry>> pruneQueue =
+      new LinkedBlockingQueue<Pair<byte[], CompactionLogEntry>>();

Review Comment:
   We should limit the `BlockingQueue` to a few thousand.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1140,7 +1193,17 @@ private void addFileInfoToCompactionLogTable(
       builder.setCompactionReason(compactionReason);
     }
 
-    addToCompactionLogTable(builder.build());
+    CompactionLogEntry compactionLogEntry = builder.build();
+    byte[] key = addToCompactionLogTable(compactionLogEntry);
+    // Add the compaction log entry to the prune queue so that the backup 
input sst files can be pruned.
+    if (isNativeLibsLoaded) {
+      try {
+        pruneQueue.put(Pair.of(key, compactionLogEntry));
+      } catch (InterruptedException e) {
+        LOG.error("Could not add backup SST files in queue to be pruned. {}",
+            compactionLogEntry.getInputFileInfoList(), e);
+      }
+    }

Review Comment:
   I'll suggest moving this code 
[here](https://github.com/apache/ozone/blob/73e0d2a38399ddb004426e2276c6ff024bc49220/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java#L815).
 Otherwise, we will miss new entries from the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to