hemantk-12 commented on code in PR #8214: URL: https://github.com/apache/ozone/pull/8214#discussion_r2053331834
########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -237,8 +256,22 @@ public class RocksDBCheckpointDiffer implements AutoCloseable, this::pruneSstFiles, pruneCompactionDagDaemonRunIntervalInMs, pruneCompactionDagDaemonRunIntervalInMs, - TimeUnit.MILLISECONDS - ); + TimeUnit.MILLISECONDS); + + try { + isNativeLibsLoaded = ManagedRawSSTFileReader.loadLibrary(); Review Comment: 1. Add a check if `ozone.om.snapshot.load.native.lib` is enabled. 2. Can we load `ManagedRawSSTFileReader.loadLibrary()`? Because it is initialized in [SnapshotDiffManager#initNativeLibraryForEfficientDiff](https://github.com/apache/ozone/blob/5c91b44ad1d6e9c61073c3718cd62929b40ba2c0/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java#L287). ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -843,9 +889,16 @@ public synchronized Optional<List<String>> getSSTDiffListWithFullPath(DifferSnap String sstFullPath = getSSTFullPath(sst, src.getDbPath(), dest.getDbPath()); Path link = Paths.get(sstFilesDirForSnapDiffJob, sst + SST_FILE_EXTENSION); - Path srcFile = Paths.get(sstFullPath); - createLink(link, srcFile); - return link.toString(); + // Take a read lock on the SST FILE + ReentrantReadWriteLock.ReadLock sstReadLock = getSSTFileLock(sstFullPath).readLock(); Review Comment: IMO, we don't need file-level granularity lock. We can define a simple lock in RocksDBCheckpointDiffer and use that. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1445,113 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedOptions managedOptions = new ManagedOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + while ((compactionLogTableEntry = pruneQueue.poll()) != null) { + + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + String sstFileName = fileInfo.getFileName(); + if (compactionNodeMap.get(sstFileName).isPruned()) { + shouldUpdateTable = true; + fileInfo.setPruned(); + updatedFileInfoList.add(fileInfo); + continue; + } + File sstFile = sstBackupDirPath.resolve(sstFileName + ROCKSDB_SST_SUFFIX).toFile(); + if (Files.notExists(sstFile.toPath())) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFile); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => file.sst.tmp + File prunedSSTFile = Files.createFile(sstBackupDirPath.resolve(sstFile.getName() + ".tmp")).toFile(); Review Comment: `Files.createFile` will throw `FileAlreadyExistsException` if the file already exists, which is possible if there was a failure in the previous run. Handle the file exists scenario. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1445,113 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedOptions managedOptions = new ManagedOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + while ((compactionLogTableEntry = pruneQueue.poll()) != null) { Review Comment: We should loop this to max to the size of the queue that is used to initialize the queue. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1445,113 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedOptions managedOptions = new ManagedOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + while ((compactionLogTableEntry = pruneQueue.poll()) != null) { + + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + String sstFileName = fileInfo.getFileName(); + if (compactionNodeMap.get(sstFileName).isPruned()) { + shouldUpdateTable = true; + fileInfo.setPruned(); + updatedFileInfoList.add(fileInfo); + continue; + } + File sstFile = sstBackupDirPath.resolve(sstFileName + ROCKSDB_SST_SUFFIX).toFile(); + if (Files.notExists(sstFile.toPath())) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFile); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => file.sst.tmp + File prunedSSTFile = Files.createFile(sstBackupDirPath.resolve(sstFile.getName() + ".tmp")).toFile(); + ReentrantReadWriteLock.WriteLock sstWriteLock = getSSTFileLock(sstFile.getAbsolutePath()).writeLock(); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFile.getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + + sstFileWriter.open(prunedSSTFile.getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + sstFileWriter.finish(); + + // Take a write lock on the SST File + sstWriteLock.lock(); + // Move file.sst.tmp to file.sst and replace existing file atomically + Files.move(prunedSSTFile.toPath(), sstFile.toPath(), + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } catch (Exception e) { + pruneQueue.put(compactionLogTableEntry); + LOG.error("Could not prune OMKeyInfo from {}. Reprocessing.", sstFileName, e); + continue; + } finally { + Files.deleteIfExists(prunedSSTFile.toPath()); + if (sstWriteLock.isHeldByCurrentThread()) { + sstWriteLock.unlock(); + } + } + + // Put pruned flag in compaction DAG. Update CompactionNode in Map. + compactionNodeMap.get(sstFileName).setPruned(); Review Comment: Why do we need to update the `compactionNode`? A file will be pruned only once during the life cycle of the process, either added by the compaction listener or the OM start-up process. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1445,113 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedOptions managedOptions = new ManagedOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + while ((compactionLogTableEntry = pruneQueue.poll()) != null) { + + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + String sstFileName = fileInfo.getFileName(); + if (compactionNodeMap.get(sstFileName).isPruned()) { + shouldUpdateTable = true; + fileInfo.setPruned(); + updatedFileInfoList.add(fileInfo); + continue; + } + File sstFile = sstBackupDirPath.resolve(sstFileName + ROCKSDB_SST_SUFFIX).toFile(); + if (Files.notExists(sstFile.toPath())) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFile); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => file.sst.tmp + File prunedSSTFile = Files.createFile(sstBackupDirPath.resolve(sstFile.getName() + ".tmp")).toFile(); + ReentrantReadWriteLock.WriteLock sstWriteLock = getSSTFileLock(sstFile.getAbsolutePath()).writeLock(); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFile.getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + + sstFileWriter.open(prunedSSTFile.getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + sstFileWriter.finish(); + + // Take a write lock on the SST File Review Comment: nit: no need to add this comment or the next one. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -171,6 +185,9 @@ public class RocksDBCheckpointDiffer implements AutoCloseable, private ColumnFamilyHandle compactionLogTableCFHandle; private ManagedRocksDB activeRocksDB; private ConcurrentMap<String, CompactionFileInfo> inflightCompactions; + private boolean isNativeLibsLoaded; + private BlockingQueue<Pair<byte[], CompactionLogEntry>> pruneQueue = + new LinkedBlockingQueue<Pair<byte[], CompactionLogEntry>>(); Review Comment: We should limit the `BlockingQueue` to a few thousand. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1140,7 +1193,17 @@ private void addFileInfoToCompactionLogTable( builder.setCompactionReason(compactionReason); } - addToCompactionLogTable(builder.build()); + CompactionLogEntry compactionLogEntry = builder.build(); + byte[] key = addToCompactionLogTable(compactionLogEntry); + // Add the compaction log entry to the prune queue so that the backup input sst files can be pruned. + if (isNativeLibsLoaded) { + try { + pruneQueue.put(Pair.of(key, compactionLogEntry)); + } catch (InterruptedException e) { + LOG.error("Could not add backup SST files in queue to be pruned. {}", + compactionLogEntry.getInputFileInfoList(), e); + } + } Review Comment: I'll suggest moving this code [here](https://github.com/apache/ozone/blob/73e0d2a38399ddb004426e2276c6ff024bc49220/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java#L815). Otherwise, we will miss new entries from the table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org