swamirishi commented on code in PR #8214:
URL: https://github.com/apache/ozone/pull/8214#discussion_r2085306470


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java:
##########
@@ -129,13 +129,23 @@ public String toString() {
         inputFileInfoList, outputFileInfoList, compactionReason);
   }
 
+  public static Builder toBuilder(CompactionLogEntry compactionLogEntry) {
+    Builder builder = new Builder(compactionLogEntry.getDbSequenceNumber(), 
compactionLogEntry.getCompactionTime(),
+        compactionLogEntry.getInputFileInfoList(), 
compactionLogEntry.getOutputFileInfoList());
+    String compactionReason = compactionLogEntry.getCompactionReason();
+    if (compactionLogEntry.getCompactionReason() != null) {
+      builder.setCompactionReason(compactionReason);
+    }
+    return builder;
+  }

Review Comment:
   ```suggestion
     public Builder toBuilder() {
       Builder builder = new Builder(this.getDbSequenceNumber(), 
this.getCompactionTime(),
           this.getInputFileInfoList(), this.getOutputFileInfoList());
       String compactionReason = this.getCompactionReason();
       if (compactionLogEntry.getCompactionReason() != null) {
         builder.setCompactionReason(compactionReason);
       }
       return builder;
     }
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java:
##########
@@ -1903,6 +1930,109 @@ private static Stream<Arguments> 
casesGetSSTDiffListWithoutDB2() {
     );
   }
 
+
+  /**
+   * Test that backup SST files are pruned on loading previous compaction logs.
+   */
+  @Test
+  public void testPruneSSTFileValues() throws Exception {
+
+    List<Pair<byte[], Integer>> keys = new ArrayList<Pair<byte[], Integer>>();
+    keys.add(Pair.of("key1".getBytes(UTF_8), Integer.valueOf(1)));
+    keys.add(Pair.of("key2".getBytes(UTF_8), Integer.valueOf(0)));
+    keys.add(Pair.of("key3".getBytes(UTF_8), Integer.valueOf(1)));
+
+    String inputFile78 = "000078";
+    String inputFile73 = "000073";
+    String outputFile81 = "000081";
+    // Create src & destination files in backup & activedirectory.
+    // Pruning job should succeed when pruned temp file is already present.
+    createSSTFileWithKeys(sstBackUpDir + "/" + inputFile78 + 
SST_FILE_EXTENSION, keys);
+    createSSTFileWithKeys(sstBackUpDir + "/" + inputFile73 + 
SST_FILE_EXTENSION, keys);
+    createSSTFileWithKeys(sstBackUpDir + PRUNED_SST_FILE_TEMP, keys);
+    createSSTFileWithKeys(activeDbDir + "/" + outputFile81 + 
SST_FILE_EXTENSION, keys);
+
+    // Load compaction log
+    CompactionLogEntry compactionLogEntry = new CompactionLogEntry(178, 
System.currentTimeMillis(),
+        Arrays.asList(
+            new CompactionFileInfo(inputFile78, "/volume/bucket1/key-5", 
"/volume/bucket2/key-10", "keyTable"),
+            new CompactionFileInfo(inputFile73, "/volume/bucket1/key-1", 
"/volume/bucket2/key-5", "keyTable")),
+        Collections.singletonList(
+            new CompactionFileInfo(outputFile81, "/volume/bucket1/key-1", 
"/volume/bucket2/key-10", "keyTable")),
+        null
+    );
+    byte[] compactionLogEntryKey = 
rocksDBCheckpointDiffer.addToCompactionLogTable(compactionLogEntry);
+    rocksDBCheckpointDiffer.loadAllCompactionLogs();
+
+    // Pruning should not fail a source SST file has been removed by a another 
pruner.
+    Files.delete(sstBackUpDir.toPath().resolve(inputFile73 + 
SST_FILE_EXTENSION));
+    // Run the SST file pruner.
+    ManagedRawSSTFileIterator mockedRawSSTFileItr = 
mock(ManagedRawSSTFileIterator.class);
+    Iterator keyItr = keys.iterator();
+    when(mockedRawSSTFileItr.hasNext()).thenReturn(true, true, true, false);
+    when(mockedRawSSTFileItr.next()).thenReturn(keyItr.next(), keyItr.next(), 
keyItr.next());
+    try (MockedConstruction<ManagedRawSSTFileReader> mockedRawSSTReader = 
Mockito.mockConstruction(
+        ManagedRawSSTFileReader.class, (mock, context) -> {
+          when(mock.newIterator(any(), any(), 
any())).thenReturn(mockedRawSSTFileItr);
+          doNothing().when(mock).close();
+        })) {
+      rocksDBCheckpointDiffer.pruneSstFileValues();
+    }
+    // pruned.sst.tmp should be deleted when pruning job exits successfully.
+    
assertFalse(Files.exists(sstBackUpDir.toPath().resolve(PRUNED_SST_FILE_TEMP)));
+
+    CompactionLogEntry updatedLogEntry;
+    try {
+      updatedLogEntry = CompactionLogEntry.getCodec().fromPersistedFormat(
+          activeRocksDB.get().get(compactionLogTableCFHandle, 
compactionLogEntryKey));
+    } catch (RocksDBException ex) {
+      throw new RocksDatabaseException("Failed to get compaction log entry.", 
ex);
+    }
+    CompactionFileInfo fileInfo78 = 
updatedLogEntry.getInputFileInfoList().get(0);
+    CompactionFileInfo fileInfo73 = 
updatedLogEntry.getInputFileInfoList().get(1);
+
+    // Verify 000078.sst has been pruned
+    assertEquals(inputFile78, fileInfo78.getFileName());
+    assertTrue(fileInfo78.isPruned());
+    ManagedSstFileReader sstFileReader = new ManagedSstFileReader(new 
ManagedOptions());
+    sstFileReader.open(sstBackUpDir.toPath().resolve(inputFile78 + 
SST_FILE_EXTENSION).toFile().getAbsolutePath());
+    ManagedSstFileReaderIterator itr = ManagedSstFileReaderIterator
+        .managed(sstFileReader.newIterator(new ManagedReadOptions()));
+    itr.get().seekToFirst();
+    int prunedKeys = 0;
+    while (itr.get().isValid()) {
+      // Verify that value is removed for non-tombstone keys.
+      assertEquals(0, itr.get().value().length);
+      prunedKeys++;
+      itr.get().next();
+    }
+    assertEquals(2, prunedKeys);
+    itr.close();
+    sstFileReader.close();
+
+    // Verify 000073.sst pruning has been skipped
+    assertFalse(fileInfo73.isPruned());
+  }
+
+  private void createSSTFileWithKeys(String filePath, List<Pair<byte[], 
Integer>> keys)
+      throws Exception {
+    try (ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(new 
ManagedEnvOptions(), new ManagedOptions())) {

Review Comment:
   having it in an in memory map is also fine. I would say writing to a txt 
file might be a bit better from building an understanding standpoint.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1200,6 +1250,109 @@ public void pruneSstFiles() {
     }
   }
 
+  /**
+   * Defines the task that removes OMKeyInfo from SST files from backup 
directory to
+   * save disk space.
+   */
+  public void pruneSstFileValues() {
+    if (!shouldRun()) {
+      return;
+    }
+
+    Path sstBackupDirPath = Paths.get(sstBackupDir);
+    Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
+    try (ManagedOptions managedOptions = new ManagedOptions();
+         ManagedEnvOptions envOptions = new ManagedEnvOptions()) {
+      byte[] compactionLogEntryKey;
+      int batchCounter = 0;
+      while ((compactionLogEntryKey = pruneQueue.peek()) != null && 
++batchCounter <= pruneSSTFileBatchSize) {
+        CompactionLogEntry compactionLogEntry;
+        // Get the compaction log entry.
+        synchronized (this) {
+          try {
+            compactionLogEntry = 
CompactionLogEntry.getCodec().fromPersistedFormat(
+                activeRocksDB.get().get(compactionLogTableCFHandle, 
compactionLogEntryKey));
+          } catch (RocksDBException ex) {
+            throw new RocksDatabaseException("Failed to get compaction log 
entry.", ex);
+          }
+
+          boolean shouldUpdateTable = false;
+          List<CompactionFileInfo> fileInfoList = 
compactionLogEntry.getInputFileInfoList();
+          List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+          for (CompactionFileInfo fileInfo : fileInfoList) {
+            // Skip pruning file if it is already pruned or is removed.
+            if (fileInfo.isPruned()) {
+              updatedFileInfoList.add(fileInfo);
+              continue;
+            }
+            Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() 
+ ROCKSDB_SST_SUFFIX);
+            if (Files.notExists(sstFilePath)) {
+              LOG.debug("Skipping pruning SST file {} as it does not exist in 
backup directory.", sstFilePath);
+              updatedFileInfoList.add(fileInfo);
+              continue;
+            }
+
+            // Prune file.sst => pruned.sst.tmp
+            Files.deleteIfExists(prunedSSTFilePath);
+            removeValueFromSSTFile(managedOptions, envOptions, 
sstFilePath.toFile().getAbsolutePath(),
+                prunedSSTFilePath.toFile().getAbsolutePath());
+
+            // Move pruned.sst.tmp => file.sst and replace existing file 
atomically.
+            try (BootstrapStateHandler.Lock lock = 
getBootstrapStateLock().lock()) {
+              Files.move(prunedSSTFilePath, sstFilePath,
+                  StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING);
+            }
+            shouldUpdateTable = true;
+            fileInfo.setPruned();
+            updatedFileInfoList.add(fileInfo);
+            LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
+          }
+
+          // Update compaction log entry in table.
+          if (shouldUpdateTable) {
+            CompactionLogEntry.Builder builder = 
CompactionLogEntry.toBuilder(compactionLogEntry);
+            builder.updateInputFileInoList(updatedFileInfoList);
+            try {
+              activeRocksDB.get().put(compactionLogTableCFHandle, 
compactionLogEntryKey,
+                  builder.build().getProtobuf().toByteArray());
+            } catch (RocksDBException ex) {
+              throw new RocksDatabaseException("Failed to update the 
compaction log table for entry: "
+                  + compactionLogEntry, ex);
+            }
+          }
+        }
+        pruneQueue.poll();
+      }
+    } catch (IOException | InterruptedException e) {
+      LOG.error("Could not prune source OMKeyInfo from backup SST files.", e);
+    }
+  }
+
+  private void removeValueFromSSTFile(ManagedOptions options, 
ManagedEnvOptions envOptions,
+      String sstFilePath, String prunedFilePath)
+      throws IOException {
+    ManagedSstFileWriter sstFileWriter = new ManagedSstFileWriter(envOptions, 
options);

Review Comment:
   why not make this part of the try with resource block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to