swamirishi commented on code in PR #8214:
URL: https://github.com/apache/ozone/pull/8214#discussion_r2080245011


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java:
##########
@@ -1903,6 +1930,109 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB2() {
     );
   }
 
+
+  /**
+   * Test that backup SST files are pruned on loading previous compaction logs.
+   */
+  @Test
+  public void testPruneSSTFileValues() throws Exception {
+
+    List<Pair<byte[], Integer>> keys = new ArrayList<Pair<byte[], Integer>>();
+    keys.add(Pair.of("key1".getBytes(UTF_8), Integer.valueOf(1)));
+    keys.add(Pair.of("key2".getBytes(UTF_8), Integer.valueOf(0)));
+    keys.add(Pair.of("key3".getBytes(UTF_8), Integer.valueOf(1)));
+
+    String inputFile78 = "000078";
+    String inputFile73 = "000073";
+    String outputFile81 = "000081";
+    // Create src & destination files in backup & activedirectory.
+    // Pruning job should succeed when pruned temp file is already present.
+    createSSTFileWithKeys(sstBackUpDir + "/" + inputFile78 + 
SST_FILE_EXTENSION, keys);
+    createSSTFileWithKeys(sstBackUpDir + "/" + inputFile73 + 
SST_FILE_EXTENSION, keys);
+    createSSTFileWithKeys(sstBackUpDir + PRUNED_SST_FILE_TEMP, keys);
+    createSSTFileWithKeys(activeDbDir + "/" + outputFile81 + 
SST_FILE_EXTENSION, keys);
+
+    // Load compaction log
+    CompactionLogEntry compactionLogEntry = new CompactionLogEntry(178, 
System.currentTimeMillis(),
+        Arrays.asList(
+            new CompactionFileInfo(inputFile78, "/volume/bucket1/key-5", 
"/volume/bucket2/key-10", "keyTable"),
+            new CompactionFileInfo(inputFile73, "/volume/bucket1/key-1", 
"/volume/bucket2/key-5", "keyTable")),
+        Collections.singletonList(
+            new CompactionFileInfo(outputFile81, "/volume/bucket1/key-1", 
"/volume/bucket2/key-10", "keyTable")),
+        null
+    );
+    byte[] compactionLogEntryKey = 
rocksDBCheckpointDiffer.addToCompactionLogTable(compactionLogEntry);
+    rocksDBCheckpointDiffer.loadAllCompactionLogs();
+
+    // Pruning should not fail a source SST file has been removed by a another 
pruner.
+    Files.delete(sstBackUpDir.toPath().resolve(inputFile73 + 
SST_FILE_EXTENSION));
+    // Run the SST file pruner.
+    ManagedRawSSTFileIterator mockedRawSSTFileItr = 
mock(ManagedRawSSTFileIterator.class);
+    Iterator keyItr = keys.iterator();
+    when(mockedRawSSTFileItr.hasNext()).thenReturn(true, true, true, false);
+    when(mockedRawSSTFileItr.next()).thenReturn(keyItr.next(), keyItr.next(), 
keyItr.next());
+    try (MockedConstruction<ManagedRawSSTFileReader> mockedRawSSTReader = 
Mockito.mockConstruction(
+        ManagedRawSSTFileReader.class, (mock, context) -> {
+          when(mock.newIterator(any(), any(), 
any())).thenReturn(mockedRawSSTFileItr);
+          doNothing().when(mock).close();
+        })) {
+      rocksDBCheckpointDiffer.pruneSstFileValues();
+    }
+    // pruned.sst.tmp should be deleted when pruning job exits successfully.
+    
assertFalse(Files.exists(sstBackUpDir.toPath().resolve(PRUNED_SST_FILE_TEMP)));
+
+    CompactionLogEntry updatedLogEntry;
+    try {
+      updatedLogEntry = CompactionLogEntry.getCodec().fromPersistedFormat(
+          activeRocksDB.get().get(compactionLogTableCFHandle, 
compactionLogEntryKey));
+    } catch (RocksDBException ex) {
+      throw new RocksDatabaseException("Failed to get compaction log entry.", 
ex);
+    }
+    CompactionFileInfo fileInfo78 = 
updatedLogEntry.getInputFileInfoList().get(0);
+    CompactionFileInfo fileInfo73 = 
updatedLogEntry.getInputFileInfoList().get(1);
+
+    // Verify 000078.sst has been pruned
+    assertEquals(inputFile78, fileInfo78.getFileName());
+    assertTrue(fileInfo78.isPruned());
+    ManagedSstFileReader sstFileReader = new ManagedSstFileReader(new 
ManagedOptions());
+    sstFileReader.open(sstBackUpDir.toPath().resolve(inputFile78 + 
SST_FILE_EXTENSION).toFile().getAbsolutePath());
+    ManagedSstFileReaderIterator itr = ManagedSstFileReaderIterator
+        .managed(sstFileReader.newIterator(new ManagedReadOptions()));
+    itr.get().seekToFirst();
+    int prunedKeys = 0;
+    while (itr.get().isValid()) {
+      // Verify that value is removed for non-tombstone keys.
+      assertEquals(0, itr.get().value().length);
+      prunedKeys++;
+      itr.get().next();
+    }
+    assertEquals(2, prunedKeys);
+    itr.close();
+    sstFileReader.close();
+
+    // Verify 000073.sst pruning has been skipped
+    assertFalse(fileInfo73.isPruned());
+  }
+
+  private void createSSTFileWithKeys(String filePath, List<Pair<byte[], 
Integer>> keys)
+      throws Exception {
+    try (ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(new 
ManagedEnvOptions(), new ManagedOptions())) {

Review Comment:
   Can we mock sst file writer to write a text file instead and also mock 
rawSstFileReader to read the text file



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1200,6 +1249,109 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
+    try (ManagedOptions managedOptions = new ManagedOptions();
+         ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+      byte[] compactionLogEntryKey;
+      int batchCounter = 0;
+      while ((compactionLogEntryKey = pruneQueue.peek()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
+        CompactionLogEntry compactionLogEntry;
+        synchronized (this) {
+          try {
+            compactionLogEntry = 
CompactionLogEntry.getCodec().fromPersistedFormat(
+                activeRocksDB.get().get(compactionLogTableCFHandle, 
compactionLogEntryKey));
+          } catch (RocksDBException ex) {
+            throw new RocksDatabaseException("Failed to get compaction log 
entry.", ex);
+          }
+        }
+        boolean shouldUpdateTable = false;
+        List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+        List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+        for (CompactionFileInfo fileInfo : fileInfoList) {
+          if (fileInfo.isPruned()) {
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() + 
ROCKSDB_SST_SUFFIX);
+          if (Files.notExists(sstFilePath)) {
+            LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFilePath);
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+
+          // Write the file.sst => pruned.sst.tmp
+          Files.deleteIfExists(prunedSSTFilePath);
+          try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = 
new ManagedRawSSTFileReader<>(
+                   managedOptions, sstFilePath.toFile().getAbsolutePath(), 
SST_READ_AHEAD_SIZE);
+               ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = 
sstFileReader.newIterator(
+                   keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), 
null, null)) {
+            sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+            while (itr.hasNext()) {
+              Pair<byte[], Integer> keyValue = itr.next();
+              if (keyValue.getValue() == 0) {
+                sstFileWriter.delete(keyValue.getKey());
+              } else {
+                sstFileWriter.put(keyValue.getKey(), new byte[0]);
+              }
+            }
+          } catch (RocksDBException ex) {
+            throw new RocksDatabaseException("Failed to write pruned entries 
for " + sstFilePath, ex);
+          } finally {
+            try {
+              sstFileWriter.finish();
+            } catch (RocksDBException ex) {
+              throw new RocksDatabaseException("Failed to finish writing to " 
+ prunedSSTFilePath, ex);
+            }
+          }
+
+          // Move file.sst.tmp to file.sst and replace existing file atomically
+          try (BootstrapStateHandler.Lock lock = 
getBootstrapStateLock().lock()) {
+            Files.move(prunedSSTFilePath, sstFilePath,
+                StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING);
+          }
+          shouldUpdateTable = true;
+          fileInfo.setPruned();
+          updatedFileInfoList.add(fileInfo);
+          LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
+        }
+
+        // Update Compaction Log table. Track keys that need updating.
+        if (shouldUpdateTable) {

Review Comment:
   Wouldn't this be all or none? Aren't we pruning all files in the 
compactionLogEntry and adding to the table?



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1200,6 +1249,109 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
+    try (ManagedOptions managedOptions = new ManagedOptions();
+         ManagedEnvOptions envOptions = new ManagedEnvOptions();
+         ManagedSstFileWriter sstFileWriter = new 
ManagedSstFileWriter(envOptions, managedOptions)) {
+      byte[] compactionLogEntryKey;
+      int batchCounter = 0;
+      while ((compactionLogEntryKey = pruneQueue.peek()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
+        CompactionLogEntry compactionLogEntry;
+        synchronized (this) {
+          try {
+            compactionLogEntry = 
CompactionLogEntry.getCodec().fromPersistedFormat(
+                activeRocksDB.get().get(compactionLogTableCFHandle, 
compactionLogEntryKey));
+          } catch (RocksDBException ex) {
+            throw new RocksDatabaseException("Failed to get compaction log 
entry.", ex);
+          }
+        }
+        boolean shouldUpdateTable = false;
+        List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+        List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+        for (CompactionFileInfo fileInfo : fileInfoList) {
+          if (fileInfo.isPruned()) {
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+          Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() + 
ROCKSDB_SST_SUFFIX);
+          if (Files.notExists(sstFilePath)) {
+            LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFilePath);
+            updatedFileInfoList.add(fileInfo);
+            continue;
+          }
+
+          // Write the file.sst => pruned.sst.tmp
+          Files.deleteIfExists(prunedSSTFilePath);
+          try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader = 
new ManagedRawSSTFileReader<>(
+                   managedOptions, sstFilePath.toFile().getAbsolutePath(), 
SST_READ_AHEAD_SIZE);
+               ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr = 
sstFileReader.newIterator(
+                   keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()), 
null, null)) {
+            sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+            while (itr.hasNext()) {
+              Pair<byte[], Integer> keyValue = itr.next();
+              if (keyValue.getValue() == 0) {
+                sstFileWriter.delete(keyValue.getKey());
+              } else {
+                sstFileWriter.put(keyValue.getKey(), new byte[0]);
+              }
+            }
+          } catch (RocksDBException ex) {
+            throw new RocksDatabaseException("Failed to write pruned entries 
for " + sstFilePath, ex);
+          } finally {
+            try {
+              sstFileWriter.finish();
+            } catch (RocksDBException ex) {
+              throw new RocksDatabaseException("Failed to finish writing to " 
+ prunedSSTFilePath, ex);
+            }
+          }
+
+          // Move file.sst.tmp to file.sst and replace existing file atomically
+          try (BootstrapStateHandler.Lock lock = 
getBootstrapStateLock().lock()) {
+            Files.move(prunedSSTFilePath, sstFilePath,
+                StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING);
+          }
+          shouldUpdateTable = true;
+          fileInfo.setPruned();
+          updatedFileInfoList.add(fileInfo);
+          LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
+        }
+
+        // Update Compaction Log table. Track keys that need updating.
+        if (shouldUpdateTable) {
+          CompactionLogEntry.Builder builder = new 
CompactionLogEntry.Builder(compactionLogEntry.getDbSequenceNumber(),
+              compactionLogEntry.getCompactionTime(), updatedFileInfoList, 
compactionLogEntry.getOutputFileInfoList());
+          String compactionReason = compactionLogEntry.getCompactionReason();
+          if (compactionReason != null) {
+            builder.setCompactionReason(compactionReason);
+          }
+          synchronized (this) {

Review Comment:
   yeah we should synchornize based on the entry. It is ok to put in one 
synchronized block as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to