swamirishi commented on code in PR #8214: URL: https://github.com/apache/ozone/pull/8214#discussion_r2080245011
########## hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java: ########## @@ -1903,6 +1930,109 @@ private static Stream<Arguments> casesGetSSTDiffListWithoutDB2() { ); } + + /** + * Test that backup SST files are pruned on loading previous compaction logs. + */ + @Test + public void testPruneSSTFileValues() throws Exception { + + List<Pair<byte[], Integer>> keys = new ArrayList<Pair<byte[], Integer>>(); + keys.add(Pair.of("key1".getBytes(UTF_8), Integer.valueOf(1))); + keys.add(Pair.of("key2".getBytes(UTF_8), Integer.valueOf(0))); + keys.add(Pair.of("key3".getBytes(UTF_8), Integer.valueOf(1))); + + String inputFile78 = "000078"; + String inputFile73 = "000073"; + String outputFile81 = "000081"; + // Create src & destination files in backup & activedirectory. + // Pruning job should succeed when pruned temp file is already present. + createSSTFileWithKeys(sstBackUpDir + "/" + inputFile78 + SST_FILE_EXTENSION, keys); + createSSTFileWithKeys(sstBackUpDir + "/" + inputFile73 + SST_FILE_EXTENSION, keys); + createSSTFileWithKeys(sstBackUpDir + PRUNED_SST_FILE_TEMP, keys); + createSSTFileWithKeys(activeDbDir + "/" + outputFile81 + SST_FILE_EXTENSION, keys); + + // Load compaction log + CompactionLogEntry compactionLogEntry = new CompactionLogEntry(178, System.currentTimeMillis(), + Arrays.asList( + new CompactionFileInfo(inputFile78, "/volume/bucket1/key-5", "/volume/bucket2/key-10", "keyTable"), + new CompactionFileInfo(inputFile73, "/volume/bucket1/key-1", "/volume/bucket2/key-5", "keyTable")), + Collections.singletonList( + new CompactionFileInfo(outputFile81, "/volume/bucket1/key-1", "/volume/bucket2/key-10", "keyTable")), + null + ); + byte[] compactionLogEntryKey = rocksDBCheckpointDiffer.addToCompactionLogTable(compactionLogEntry); + rocksDBCheckpointDiffer.loadAllCompactionLogs(); + + // Pruning should not fail a source SST file has been removed by a another pruner. + Files.delete(sstBackUpDir.toPath().resolve(inputFile73 + SST_FILE_EXTENSION)); + // Run the SST file pruner. + ManagedRawSSTFileIterator mockedRawSSTFileItr = mock(ManagedRawSSTFileIterator.class); + Iterator keyItr = keys.iterator(); + when(mockedRawSSTFileItr.hasNext()).thenReturn(true, true, true, false); + when(mockedRawSSTFileItr.next()).thenReturn(keyItr.next(), keyItr.next(), keyItr.next()); + try (MockedConstruction<ManagedRawSSTFileReader> mockedRawSSTReader = Mockito.mockConstruction( + ManagedRawSSTFileReader.class, (mock, context) -> { + when(mock.newIterator(any(), any(), any())).thenReturn(mockedRawSSTFileItr); + doNothing().when(mock).close(); + })) { + rocksDBCheckpointDiffer.pruneSstFileValues(); + } + // pruned.sst.tmp should be deleted when pruning job exits successfully. + assertFalse(Files.exists(sstBackUpDir.toPath().resolve(PRUNED_SST_FILE_TEMP))); + + CompactionLogEntry updatedLogEntry; + try { + updatedLogEntry = CompactionLogEntry.getCodec().fromPersistedFormat( + activeRocksDB.get().get(compactionLogTableCFHandle, compactionLogEntryKey)); + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to get compaction log entry.", ex); + } + CompactionFileInfo fileInfo78 = updatedLogEntry.getInputFileInfoList().get(0); + CompactionFileInfo fileInfo73 = updatedLogEntry.getInputFileInfoList().get(1); + + // Verify 000078.sst has been pruned + assertEquals(inputFile78, fileInfo78.getFileName()); + assertTrue(fileInfo78.isPruned()); + ManagedSstFileReader sstFileReader = new ManagedSstFileReader(new ManagedOptions()); + sstFileReader.open(sstBackUpDir.toPath().resolve(inputFile78 + SST_FILE_EXTENSION).toFile().getAbsolutePath()); + ManagedSstFileReaderIterator itr = ManagedSstFileReaderIterator + .managed(sstFileReader.newIterator(new ManagedReadOptions())); + itr.get().seekToFirst(); + int prunedKeys = 0; + while (itr.get().isValid()) { + // Verify that value is removed for non-tombstone keys. + assertEquals(0, itr.get().value().length); + prunedKeys++; + itr.get().next(); + } + assertEquals(2, prunedKeys); + itr.close(); + sstFileReader.close(); + + // Verify 000073.sst pruning has been skipped + assertFalse(fileInfo73.isPruned()); + } + + private void createSSTFileWithKeys(String filePath, List<Pair<byte[], Integer>> keys) + throws Exception { + try (ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(new ManagedEnvOptions(), new ManagedOptions())) { Review Comment: Can we mock sst file writer to write a text file instead and also mock rawSstFileReader to read the text file ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1200,6 +1249,109 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + byte[] compactionLogEntryKey; + int batchCounter = 0; + while ((compactionLogEntryKey = pruneQueue.peek()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + CompactionLogEntry compactionLogEntry; + synchronized (this) { + try { + compactionLogEntry = CompactionLogEntry.getCodec().fromPersistedFormat( + activeRocksDB.get().get(compactionLogTableCFHandle, compactionLogEntryKey)); + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to get compaction log entry.", ex); + } + } + boolean shouldUpdateTable = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() + ROCKSDB_SST_SUFFIX); + if (Files.notExists(sstFilePath)) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => pruned.sst.tmp + Files.deleteIfExists(prunedSSTFilePath); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFilePath.toFile().getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to write pruned entries for " + sstFilePath, ex); + } finally { + try { + sstFileWriter.finish(); + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to finish writing to " + prunedSSTFilePath, ex); + } + } + + // Move file.sst.tmp to file.sst and replace existing file atomically + try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + Files.move(prunedSSTFilePath, sstFilePath, + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } + shouldUpdateTable = true; + fileInfo.setPruned(); + updatedFileInfoList.add(fileInfo); + LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath); + } + + // Update Compaction Log table. Track keys that need updating. + if (shouldUpdateTable) { Review Comment: Wouldn't this be all or none? Aren't we pruning all files in the compactionLogEntry and adding to the table? ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java: ########## @@ -1200,6 +1249,109 @@ public void pruneSstFiles() { } } + /** + * Defines the task that removes OMKeyInfo from SST files from backup directory to + * save disk space. + */ + public void pruneSstFileValues() { + if (!shouldRun()) { + return; + } + + Path sstBackupDirPath = Paths.get(sstBackupDir); + Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP); + try (ManagedOptions managedOptions = new ManagedOptions(); + ManagedEnvOptions envOptions = new ManagedEnvOptions(); + ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, managedOptions)) { + byte[] compactionLogEntryKey; + int batchCounter = 0; + while ((compactionLogEntryKey = pruneQueue.peek()) != null && ++batchCounter <= pruneSSTFileBatchSize) { + CompactionLogEntry compactionLogEntry; + synchronized (this) { + try { + compactionLogEntry = CompactionLogEntry.getCodec().fromPersistedFormat( + activeRocksDB.get().get(compactionLogTableCFHandle, compactionLogEntryKey)); + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to get compaction log entry.", ex); + } + } + boolean shouldUpdateTable = false; + List<CompactionFileInfo> fileInfoList = compactionLogEntry.getInputFileInfoList(); + List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>(); + for (CompactionFileInfo fileInfo : fileInfoList) { + if (fileInfo.isPruned()) { + updatedFileInfoList.add(fileInfo); + continue; + } + Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() + ROCKSDB_SST_SUFFIX); + if (Files.notExists(sstFilePath)) { + LOG.debug("Skipping pruning SST file {} as it does not exist in backup directory.", sstFilePath); + updatedFileInfoList.add(fileInfo); + continue; + } + + // Write the file.sst => pruned.sst.tmp + Files.deleteIfExists(prunedSSTFilePath); + try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = new ManagedRawSSTFileReader<>( + managedOptions, sstFilePath.toFile().getAbsolutePath(), SST_READ_AHEAD_SIZE); + ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = sstFileReader.newIterator( + keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), null, null)) { + sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath()); + while (itr.hasNext()) { + Pair<byte[], Integer> keyValue = itr.next(); + if (keyValue.getValue() == 0) { + sstFileWriter.delete(keyValue.getKey()); + } else { + sstFileWriter.put(keyValue.getKey(), new byte[0]); + } + } + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to write pruned entries for " + sstFilePath, ex); + } finally { + try { + sstFileWriter.finish(); + } catch (RocksDBException ex) { + throw new RocksDatabaseException("Failed to finish writing to " + prunedSSTFilePath, ex); + } + } + + // Move file.sst.tmp to file.sst and replace existing file atomically + try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + Files.move(prunedSSTFilePath, sstFilePath, + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } + shouldUpdateTable = true; + fileInfo.setPruned(); + updatedFileInfoList.add(fileInfo); + LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath); + } + + // Update Compaction Log table. Track keys that need updating. + if (shouldUpdateTable) { + CompactionLogEntry.Builder builder = new CompactionLogEntry.Builder(compactionLogEntry.getDbSequenceNumber(), + compactionLogEntry.getCompactionTime(), updatedFileInfoList, compactionLogEntry.getOutputFileInfoList()); + String compactionReason = compactionLogEntry.getCompactionReason(); + if (compactionReason != null) { + builder.setCompactionReason(compactionReason); + } + synchronized (this) { Review Comment: yeah we should synchornize based on the entry. It is ok to put in one synchronized block as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org