SaketaChalamchala commented on code in PR #8214: URL: https://github.com/apache/ozone/pull/8214#discussion_r2080314999
########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1200,6 +1249,109 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + byte[] compactionLogEntryKey; + int batchCounter = 0; + while ((compactionLogEntryKey = pruneQueue.peek()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + CompactionLogEntry compactionLogEntry; + synchronized (this) { + try { + compactionLogEntry = CompactionLogEntry.getCodec().fromPersistedFormat( + activeRocksDB.get().get(compactionLogTableCFHandle, compactionLogEntryKey)); + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to get compaction log entry.", ex); + } + } + boolean shouldUpdateTable = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() + ROCKSDB_SST_SUFFIX); + if (Files.notExists(sstFilePath)) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => pruned.sst.tmp + Files.deleteIfExists(prunedSSTFilePath); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFilePath.toFile().getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to write pruned entries for " + sstFilePath, ex); + } finally { + try { + sstFileWriter.finish(); + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to finish writing to " + prunedSSTFilePath, ex); + } + } + + // Move file.sst.tmp to file.sst and replace existing file atomically + try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + Files.move(prunedSSTFilePath, sstFilePath, + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } + shouldUpdateTable = true; + fileInfo.setPruned(); + updatedFileInfoList.add(fileInfo); + LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath); + } + + // Update Compaction Log table. Track keys that need updating. + if (shouldUpdateTable) { + CompactionLogEntry.Builder builder = new CompactionLogEntry.Builder(compactionLogEntry.getDbSequenceNumber(), + compactionLogEntry.getCompactionTime(), updatedFileInfoList, compactionLogEntry.getOutputFileInfoList()); + String compactionReason = compactionLogEntry.getCompactionReason(); + if (compactionReason != null) { + builder.setCompactionReason(compactionReason); + } + synchronized (this) { Review Comment: Yes, DAG pruner is removing the older compactionLogEntries (30d by default). The SST files would have been pruned by then already. I could have everything under the while loop in a synchronized block just to be safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org