SaketaChalamchala commented on code in PR #8214: URL: https://github.com/apache/ozone/pull/8214#discussion_r2085559721
########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1200,6 +1250,109 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions()) { + byte[] compactionLogEntryKey; + int batchCounter = 0; + while ((compactionLogEntryKey = pruneQueue.peek()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + CompactionLogEntry compactionLogEntry; + // Get the compaction log entry. + synchronized (this) { + try { + compactionLogEntry = CompactionLogEntry.getCodec().fromPersistedFormat( + activeRocksDB.get().get(compactionLogTableCFHandle, compactionLogEntryKey)); + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to get compaction log entry.", ex); + } + + boolean shouldUpdateTable = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + // Skip pruning file if it is already pruned or is removed. + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() + ROCKSDB_SST_SUFFIX); + if (Files.notExists(sstFilePath)) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Prune file.sst => pruned.sst.tmp + Files.deleteIfExists(prunedSSTFilePath); + removeValueFromSSTFile(managedOptions, envOptions, sstFilePath.toFile().getAbsolutePath(), + prunedSSTFilePath.toFile().getAbsolutePath()); + + // Move pruned.sst.tmp => file.sst and replace existing file atomically. + try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + Files.move(prunedSSTFilePath, sstFilePath, + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } + shouldUpdateTable = true; + fileInfo.setPruned(); + updatedFileInfoList.add(fileInfo); + LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath); + } + + // Update compaction log entry in table. + if (shouldUpdateTable) { + CompactionLogEntry.Builder builder = CompactionLogEntry.toBuilder(compactionLogEntry); + builder.updateInputFileInoList(updatedFileInfoList); + try { + activeRocksDB.get().put(compactionLogTableCFHandle, compactionLogEntryKey, + builder.build().getProtobuf().toByteArray()); + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to update the compaction log table for entry: " + + compactionLogEntry, ex); + } + } + } + pruneQueue.poll(); + } + } catch (IOException | InterruptedException e) { + LOG.error("Could not prune source OMKeyInfo from backup SST files.", e); + } + } + + private void removeValueFromSSTFile(ManagedOptions options, ManagedEnvOptions envOptions, + String sstFilePath, String prunedFilePath) + throws IOException { + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, options); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + options, sstFilePath, SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + sstFileWriter.open(prunedFilePath); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), EMPTY_BYTE_ARRAY); + } + } + sstFileWriter.finish(); + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to write pruned entries for " + sstFilePath, ex); + } finally { + sstFileWriter.close(); + } Review Comment: ```suggestion } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org