swamirishi commented on code in PR #8214:
URL: https://github.com/apache/ozone/pull/8214#discussion_r2081906483


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1200,6 +1249,109 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
+    try (ManagedOptions managedOptions = new ManagedOptions();
+         ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+      byte[] compactionLogEntryKey;
+      int batchCounter = 0;
+      while ((compactionLogEntryKey = pruneQueue.peek()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
+        CompactionLogEntry compactionLogEntry;
+        synchronized (this) {
+          try {
+            compactionLogEntry = 
CompactionLogEntry.getCodec().fromPersistedFormat(
+                activeRocksDB.get().get(compactionLogTableCFHandle, 
compactionLogEntryKey));
+          } catch (RocksDBException ex) {
+            throw new RocksDatabaseException("Failed to get compaction log 
entry.", ex);
+          }
+
+          boolean shouldUpdateTable = false;
+          List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+          List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+          for (CompactionFileInfo fileInfo : fileInfoList) {
+            if (fileInfo.isPruned()) {
+              updatedFileInfoList.add(fileInfo);
+              continue;
+            }
+            Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() 
+ ROCKSDB_SST_SUFFIX);
+            if (Files.notExists(sstFilePath)) {
+              LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFilePath);
+              updatedFileInfoList.add(fileInfo);
+              continue;
+            }
+
+            // Write the file.sst => pruned.sst.tmp
+            Files.deleteIfExists(prunedSSTFilePath);
+            try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader 
= new ManagedRawSSTFileReader<>(
+                     managedOptions, sstFilePath.toFile().getAbsolutePath(), 
SST_READ_AHEAD_SIZE);
+                 ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = 
sstFileReader.newIterator(
+                     keyValue -> Pair.of(keyValue.getKey(), 
keyValue.getType()), null, null)) {
+              sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+              while (itr.hasNext()) {
+                Pair<byte[], Integer> keyValue = itr.next();
+                if (keyValue.getValue() == 0) {
+                  sstFileWriter.delete(keyValue.getKey());
+                } else {
+                  sstFileWriter.put(keyValue.getKey(), new byte[0]);
+                }
+              }
+            } catch (RocksDBException ex) {
+              throw new RocksDatabaseException("Failed to write pruned entries 
for " + sstFilePath, ex);
+            } finally {
+              try {
+                sstFileWriter.finish();
+              } catch (RocksDBException ex) {
+                throw new RocksDatabaseException("Failed to close 
SSTFileWriter writing to path: "
+                    + prunedSSTFilePath, ex);
+              }
+            }
+
+            // Move file.sst.tmp to file.sst and replace existing file 
atomically
+            try (BootstrapStateHandler.Lock lock = 
getBootstrapStateLock().lock()) {
+              Files.move(prunedSSTFilePath, sstFilePath,
+                  StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING);
+            }
+            shouldUpdateTable = true;
+            fileInfo.setPruned();
+            updatedFileInfoList.add(fileInfo);
+            LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
+          }
+
+          // Update Compaction Log table. Track keys that need updating.
+          if (shouldUpdateTable) {
+            CompactionLogEntry.Builder builder = new 
CompactionLogEntry.Builder(
+                compactionLogEntry.getDbSequenceNumber(), 
compactionLogEntry.getCompactionTime(),
+                updatedFileInfoList, 
compactionLogEntry.getOutputFileInfoList());

Review Comment:
   I was saying if we can create a toBuilder() function in CompactionLogEntry 
class which would just copy this object and then we can update 
inputFileInfoList in the builder with updatedFileInfoList 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to