SaketaChalamchala commented on code in PR #8214: URL: https://github.com/apache/ozone/pull/8214#discussion_r2056793305
########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1445,113 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedOptions managedOptions = new ManagedOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + while ((compactionLogTableEntry = pruneQueue.poll()) != null) { + + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + String sstFileName = fileInfo.getFileName(); + if (compactionNodeMap.get(sstFileName).isPruned()) { + shouldUpdateTable = true; + fileInfo.setPruned(); + updatedFileInfoList.add(fileInfo); + continue; + } + File sstFile = sstBackupDirPath.resolve(sstFileName + ROCKSDB_SST_SUFFIX).toFile(); + if (Files.notExists(sstFile.toPath())) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFile); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => file.sst.tmp + File prunedSSTFile = Files.createFile(sstBackupDirPath.resolve(sstFile.getName() + ".tmp")).toFile(); + ReentrantReadWriteLock.WriteLock sstWriteLock = getSSTFileLock(sstFile.getAbsolutePath()).writeLock(); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFile.getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + + sstFileWriter.open(prunedSSTFile.getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + sstFileWriter.finish(); + + // Take a write lock on the SST File + sstWriteLock.lock(); + // Move file.sst.tmp to file.sst and replace existing file atomically + Files.move(prunedSSTFile.toPath(), sstFile.toPath(), + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } catch (Exception e) { + pruneQueue.put(compactionLogTableEntry); + LOG.error("Could not prune OMKeyInfo from {}. Reprocessing.", sstFileName, e); + continue; + } finally { + Files.deleteIfExists(prunedSSTFile.toPath()); + if (sstWriteLock.isHeldByCurrentThread()) { + sstWriteLock.unlock(); + } + } + + // Put pruned flag in compaction DAG. Update CompactionNode in Map. + compactionNodeMap.get(sstFileName).setPruned(); Review Comment: Yeah.. the thought was to verify if the file was pruned against the compactioNodeMap in heap if the thread was stopped. But it doesn't look like its necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org