swamirishi commented on code in PR #8214: URL: https://github.com/apache/ozone/pull/8214#discussion_r2071511787
########## hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java: ########## @@ -1889,6 +1909,94 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB2() { ); } + + /** + * Test that backup SST files are pruned on loading previous compaction logs. + */ + @EnabledIfSystemProperty(named = ROCKS_TOOLS_NATIVE_PROPERTY, matches = "true") Review Comment: We need not check if rocks tools is loaded. We can just mock the raw sst file iterator construction. This test should be more of a unit test case. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1428,115 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + int batchCounter = 0; + while ((compactionLogTableEntry = pruneQueue.poll()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; Review Comment: Lets break this loop altogether in the case of an exception. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1428,115 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + int batchCounter = 0; + while ((compactionLogTableEntry = pruneQueue.poll()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; + boolean shouldReprocess = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + String sstFileName = fileInfo.getFileName(); + Path sstFilePath = sstBackupDirPath.resolve(sstFileName + ROCKSDB_SST_SUFFIX); + if (Files.notExists(sstFilePath)) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => file.sst.tmp + Path prunedSSTFilePath = sstBackupDirPath.resolve(sstFilePath.toFile().getName() + ".tmp"); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFilePath.toFile().getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + Files.deleteIfExists(prunedSSTFilePath); + sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + } catch (IOException | RocksDBException e) { + shouldReprocess = true; + try { + Files.deleteIfExists(prunedSSTFilePath); + } catch (IOException ex) { } + continue; + } finally { + try { + sstFileWriter.finish(); + } catch (RocksDBException e) { + throw new RuntimeException("Failed to finish writing to " + prunedSSTFilePath, e); + } + } + + // Move file.sst.tmp to file.sst and replace existing file atomically + try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + Files.move(prunedSSTFilePath, sstFilePath, + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException | InterruptedException e) { + shouldReprocess = true; + continue; + } finally { + try { + Files.deleteIfExists(prunedSSTFilePath); Review Comment: How about having the same path for all the tmp files and just deleting it at the start of the sstFileWriter instead of having this rollback finally block in multiple places? Something like "pruned.sst.tmp" ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1428,115 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + int batchCounter = 0; + while ((compactionLogTableEntry = pruneQueue.poll()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; + boolean shouldReprocess = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + String sstFileName = fileInfo.getFileName(); + Path sstFilePath = sstBackupDirPath.resolve(sstFileName + ROCKSDB_SST_SUFFIX); + if (Files.notExists(sstFilePath)) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => file.sst.tmp + Path prunedSSTFilePath = sstBackupDirPath.resolve(sstFilePath.toFile().getName() + ".tmp"); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFilePath.toFile().getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + Files.deleteIfExists(prunedSSTFilePath); + sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + } catch (IOException | RocksDBException e) { + shouldReprocess = true; + try { + Files.deleteIfExists(prunedSSTFilePath); + } catch (IOException ex) { } + continue; + } finally { + try { + sstFileWriter.finish(); + } catch (RocksDBException e) { + throw new RuntimeException("Failed to finish writing to " + prunedSSTFilePath, e); Review Comment: https://github.com/apache/ozone/pull/8277 ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -163,14 +175,18 @@ public class RocksDBCheckpointDiffer implements AutoCloseable, private final long maxAllowedTimeInDag; private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock(); - + private static final int SST_READ_AHEAD_SIZE = 2 * 1024 * 1024; + private int pruneSSTFileBatchSize; private ColumnFamilyHandle snapshotInfoTableCFHandle; private static final String DAG_PRUNING_SERVICE_NAME = "CompactionDagPruningService"; private AtomicBoolean suspended; private ColumnFamilyHandle compactionLogTableCFHandle; private ManagedRocksDB activeRocksDB; private ConcurrentMap<String, CompactionFileInfo> inflightCompactions; + private boolean isNativeLibsLoaded = false; + private ConcurrentLinkedQueue<Pair<byte[], CompactionLogEntry>> pruneQueue = Review Comment: Keep this null or Optional.empty() by default and only initialize if the prune thread is going to run. ########## hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java: ########## @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.om.snapshot; import static org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB; Review Comment: Why this change? ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java: ########## @@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.DELETE; import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.MODIFY; import static org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.RENAME; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB; Review Comment: Why is this change required? ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1428,115 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + int batchCounter = 0; + while ((compactionLogTableEntry = pruneQueue.poll()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; + boolean shouldReprocess = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + String sstFileName = fileInfo.getFileName(); + Path sstFilePath = sstBackupDirPath.resolve(sstFileName + ROCKSDB_SST_SUFFIX); + if (Files.notExists(sstFilePath)) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => file.sst.tmp + Path prunedSSTFilePath = sstBackupDirPath.resolve(sstFilePath.toFile().getName() + ".tmp"); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFilePath.toFile().getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + Files.deleteIfExists(prunedSSTFilePath); + sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + } catch (IOException | RocksDBException e) { + shouldReprocess = true; + try { + Files.deleteIfExists(prunedSSTFilePath); + } catch (IOException ex) { } + continue; + } finally { + try { + sstFileWriter.finish(); + } catch (RocksDBException e) { + throw new RuntimeException("Failed to finish writing to " + prunedSSTFilePath, e); Review Comment: Why runtime exception? Convert RocksdbException into RocksDatabaseException. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -526,12 +562,17 @@ public void onCompactionCompleted(RocksDB db, inflightCompactions.remove(inputFile); } } + // Add the compaction log entry to the prune queue + // so that the backup input sst files can be pruned. + if (isNativeLibsLoaded) { + pruneQueue.offer(Pair.of(key, compactionLogEntry)); Review Comment: Instead of a pair just add the key to the queue. We need not keep the compactionLogEntry in memory ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -526,12 +562,17 @@ public void onCompactionCompleted(RocksDB db, inflightCompactions.remove(inputFile); } } + // Add the compaction log entry to the prune queue + // so that the backup input sst files can be pruned. + if (isNativeLibsLoaded) { Review Comment: I don't like this logic present in multiple places. We should just check pruneQueue != null ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1428,115 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + int batchCounter = 0; + while ((compactionLogTableEntry = pruneQueue.poll()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; + boolean shouldReprocess = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + String sstFileName = fileInfo.getFileName(); + Path sstFilePath = sstBackupDirPath.resolve(sstFileName + ROCKSDB_SST_SUFFIX); + if (Files.notExists(sstFilePath)) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => file.sst.tmp + Path prunedSSTFilePath = sstBackupDirPath.resolve(sstFilePath.toFile().getName() + ".tmp"); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFilePath.toFile().getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + Files.deleteIfExists(prunedSSTFilePath); + sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + } catch (IOException | RocksDBException e) { + shouldReprocess = true; + try { + Files.deleteIfExists(prunedSSTFilePath); + } catch (IOException ex) { } + continue; + } finally { + try { + sstFileWriter.finish(); + } catch (RocksDBException e) { + throw new RuntimeException("Failed to finish writing to " + prunedSSTFilePath, e); + } + } + + // Move file.sst.tmp to file.sst and replace existing file atomically + try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + Files.move(prunedSSTFilePath, sstFilePath, + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException | InterruptedException e) { + shouldReprocess = true; + continue; + } finally { + try { + Files.deleteIfExists(prunedSSTFilePath); Review Comment: Let us just break the entire thread run if an exception is thrown. This exception handling logic is convoluting the piece of code which IMO is unnecessary. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1382,6 +1428,115 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + Pair<byte[], CompactionLogEntry> compactionLogTableEntry; + int batchCounter = 0; + while ((compactionLogTableEntry = pruneQueue.poll()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + byte[] key = compactionLogTableEntry.getKey(); + CompactionLogEntry compactionLogEntry = compactionLogTableEntry.getValue(); + boolean shouldUpdateTable = false; + boolean shouldReprocess = false; Review Comment: How about just peeking and not polling at the start? We wouldn't need this reprocess boolean value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org