Copilot commented on code in PR #8912:
URL: https://github.com/apache/ozone/pull/8912#discussion_r2346029798


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java:
##########
@@ -233,69 +235,145 @@ public String next() {
     }
   }
 
-  private abstract static class MultipleSstFileIterator<T> implements 
ClosableIterator<T> {
+  /**
+   * A wrapper class that holds an iterator and its current value for heap 
operations.
+   */
+  private static class HeapEntry<T extends Comparable<T>>
+      implements Comparable<HeapEntry<T>>, Closeable {
+    private final ClosableIterator<T> iterator;
+    private T currentKey;
+
+    HeapEntry(ClosableIterator<T> iterator) {
+      this.iterator = iterator;
+      advance();
+    }
+
+    @Override
+    public void close() {
+      iterator.close();
+    }
+
+    boolean advance() {
+      if (iterator.hasNext()) {
+        currentKey = iterator.next();
+        return true;
+      } else {
+        currentKey = null;
+        return false;
+      }
+    }
+
+    T getCurrentKey() {
+      return currentKey;
+    }
+
+    @Override
+    public int compareTo(@Nonnull HeapEntry<T> other) {
+      if (this.currentKey == null && other.currentKey == null) {
+        return 0;
+      }
+      if (this.currentKey == null) {
+        return 1;
+      }
+      if (other.currentKey == null) {
+        return -1;
+      }
+      return this.currentKey.compareTo(other.currentKey);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      }
 
-    private final Iterator<String> fileNameIterator;
+      HeapEntry<T> other = (HeapEntry<T>) obj;
+      return compareTo(other) == 0;
+    }
 
-    private String currentFile;
-    private ClosableIterator<T> currentFileIterator;
+    @Override
+    public int hashCode() {
+      return Objects.hash(iterator, currentKey);
+    }
+  }
 
-    private MultipleSstFileIterator(Collection<String> files) {
-      this.fileNameIterator = files.iterator();
+  /**
+   * The MultipleSstFileIterator class is an abstract base for iterating over 
multiple SST files.
+   * It uses a PriorityQueue to merge keys from all files in sorted order.
+   * Each file's iterator is wrapped in a HeapEntryWithFileIdx object,
+   * which ensures stable ordering for identical keys by considering the file 
index.
+   * @param <T>
+   */
+  private abstract static class MultipleSstFileIterator<T extends 
Comparable<T>> implements ClosableIterator<T> {
+    private final PriorityQueue<HeapEntry<T>> minHeap;
+
+    private MultipleSstFileIterator(Collection<String> sstFiles) {
+      this.minHeap = new PriorityQueue<>();
       init();
+      initMinHeap(sstFiles);
     }
 
     protected abstract void init();
 
     protected abstract ClosableIterator<T> getKeyIteratorForFile(String file) 
throws RocksDBException, IOException;
 
-    @Override
-    public boolean hasNext() {
+    private void initMinHeap(Collection<String> files) {
       try {
-        do {
-          if (Objects.nonNull(currentFileIterator) && 
currentFileIterator.hasNext()) {
-            return true;
+        for (String file : files) {
+          ClosableIterator<T> iterator = getKeyIteratorForFile(file);
+          HeapEntry<T> entry = new HeapEntry<>(iterator);
+
+          if (entry.getCurrentKey() != null) {
+            minHeap.offer(entry);
+          } else {
+            // No valid entries, close the iterator
+            entry.close();
           }
-        } while (moveToNextFile());
+        }
       } catch (IOException | RocksDBException e) {
-        // TODO: [Snapshot] This exception has to be handled by the caller.
-        //  We have to do better exception handling.
-        throw new RuntimeException(e);
+        // Clean up any opened iterators
+        close();
+        throw new RuntimeException("Failed to initialize SST file iterators", 
e);
       }
-      return false;
     }
 
     @Override
-    public T next() {
-      if (hasNext()) {
-        return currentFileIterator.next();
-      }
-      throw new NoSuchElementException("No more elements found.");
+    public boolean hasNext() {
+      return !minHeap.isEmpty();
     }
 
     @Override
-    public void close() throws UncheckedIOException {
-      try {
-        closeCurrentFile();
-      } catch (IOException e) {
-        throw new UncheckedIOException(e);
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more elements found.");
       }
-    }
 
-    private boolean moveToNextFile() throws IOException, RocksDBException {
-      if (fileNameIterator.hasNext()) {
-        closeCurrentFile();
-        currentFile = fileNameIterator.next();
-        this.currentFileIterator = getKeyIteratorForFile(currentFile);
-        return true;
+      assert minHeap.peek() != null;
+      // Get current key from heap
+      HeapEntry<T> currentKey = minHeap.peek();
+
+      // Advance all entries with the same key (from different files)
+      // and keep the one with the highest file index
+      while (!minHeap.isEmpty() && Objects.equals(minHeap.peek(), currentKey)) 
{

Review Comment:
   The comment mentions keeping 'the one with the highest file index' but the 
code doesn't implement this logic. The current implementation advances all 
entries with the same key but doesn't actually select based on file index. 
There's no mechanism to track or compare file indices in the HeapEntry class.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java:
##########
@@ -233,69 +235,145 @@ public String next() {
     }
   }
 
-  private abstract static class MultipleSstFileIterator<T> implements 
ClosableIterator<T> {
+  /**
+   * A wrapper class that holds an iterator and its current value for heap 
operations.
+   */
+  private static class HeapEntry<T extends Comparable<T>>
+      implements Comparable<HeapEntry<T>>, Closeable {
+    private final ClosableIterator<T> iterator;
+    private T currentKey;
+
+    HeapEntry(ClosableIterator<T> iterator) {
+      this.iterator = iterator;
+      advance();
+    }
+
+    @Override
+    public void close() {
+      iterator.close();
+    }
+
+    boolean advance() {
+      if (iterator.hasNext()) {
+        currentKey = iterator.next();
+        return true;
+      } else {
+        currentKey = null;
+        return false;
+      }
+    }
+
+    T getCurrentKey() {
+      return currentKey;
+    }
+
+    @Override
+    public int compareTo(@Nonnull HeapEntry<T> other) {
+      if (this.currentKey == null && other.currentKey == null) {
+        return 0;
+      }
+      if (this.currentKey == null) {
+        return 1;
+      }
+      if (other.currentKey == null) {
+        return -1;
+      }
+      return this.currentKey.compareTo(other.currentKey);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      }
 
-    private final Iterator<String> fileNameIterator;
+      HeapEntry<T> other = (HeapEntry<T>) obj;

Review Comment:
   Unchecked cast should be avoided. Add a type check before casting or use 
@SuppressWarnings(\"unchecked\") if the cast is guaranteed to be safe.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdb/util/TestSstFileSetReader.java:
##########
@@ -87,14 +98,32 @@ private String createRandomSSTFile(TreeMap<String, Integer> 
keys)
     return file.getAbsolutePath();
   }
 
+  /**
+   * Helper method to create a map of keys with values alternating between 0 
and 1.
+   * Keys with even indices get value 0 (will be treated as 
deletions/tombstones),
+   * keys with odd indices get value 1 (will be treated as regular entries).
+   *
+   * @param startRange Starting range for key generation (inclusive)
+   * @param endRange Ending range for key generation (exclusive)
+   * @return Map of keys with alternating 0/1 values
+   */
   private Map<String, Integer> createKeys(int startRange, int endRange) {
     return IntStream.range(startRange, endRange).boxed()
         .collect(Collectors.toMap(i -> KEY_PREFIX + i,
             i -> i % 2));
   }
 
-  private Pair<SortedMap<String, Integer>, List<String>> createDummyData(
-      int numberOfFiles) throws RocksDBException, IOException {
+  /**
+   * Helper method to create dummy test data consisting of multiple SST files.
+   * Keys are distributed across files in round-robin fashion, ensuring each 
file
+   * contains a subset of the total key space for testing overlapping 
scenarios.
+   *
+   * @param numberOfFiles Number of SST files to create
+   * @return Pair containing the complete sorted key map and list of SST file 
paths
+   * @throws RocksDBException if there's an error during SST file creation
+   * @throws IOException if there's an I/O error during file operations

Review Comment:
   The method signature was changed to remove IOException from the throws 
clause, but the JavaDoc still mentions it can throw IOException. The JavaDoc 
should be updated to reflect the actual exceptions thrown.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to