SaketaChalamchala commented on code in PR #8214: URL: https://github.com/apache/ozone/pull/8214#discussion_r2072116029
########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1428,115 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + int batchCounter = 0; + while ((compactionLogTableEntry = pruneQueue.poll()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; + boolean shouldReprocess = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + String sstFileName = fileInfo.getFileName(); + Path sstFilePath = sstBackupDirPath.resolve(sstFileName + ROCKSDB_SST_SUFFIX); + if (Files.notExists(sstFilePath)) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => file.sst.tmp + Path prunedSSTFilePath = sstBackupDirPath.resolve(sstFilePath.toFile().getName() + ".tmp"); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFilePath.toFile().getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + Files.deleteIfExists(prunedSSTFilePath); + sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + } catch (IOException | RocksDBException e) { + shouldReprocess = true; + try { + Files.deleteIfExists(prunedSSTFilePath); + } catch (IOException ex) { } + continue; + } finally { + try { + sstFileWriter.finish(); + } catch (RocksDBException e) { + throw new RuntimeException("Failed to finish writing to " + prunedSSTFilePath, e); + } + } + + // Move file.sst.tmp to file.sst and replace existing file atomically + try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + Files.move(prunedSSTFilePath, sstFilePath, + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException | InterruptedException e) { + shouldReprocess = true; + continue; + } finally { + try { + Files.deleteIfExists(prunedSSTFilePath); Review Comment: Thanks for the review @swamirishi. Do you see any reason why pruning files in one compactionEntry would fail every time because that would cause the queue to be backed up forever. Otherwise, these changes make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org