smengcl commented on code in PR #8912:
URL: https://github.com/apache/ozone/pull/8912#discussion_r2331514624


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java:
##########
@@ -233,70 +236,149 @@ public String next() {
     }
   }
 
-  private abstract static class MultipleSstFileIterator<T> implements 
ClosableIterator<T> {
+  /**
+   * A wrapper class that holds an iterator and its current value for heap 
operations.
+   */
+  private static class HeapEntryWithFileIdx<T extends Comparable<T>>
+      implements Comparable<HeapEntryWithFileIdx<T>> {
+    private final ClosableIterator<T> iterator;
+    private T currentKey;
+    // To ensure stable ordering for identical keys
+    private final int fileIndex;
+
+    HeapEntryWithFileIdx(ClosableIterator<T> iterator, int fileIndex) {
+      this.iterator = iterator;
+      this.fileIndex = fileIndex;
+      advance();
+    }
+
+    void close() {
+      iterator.close();
+    }
+
+    boolean advance() {
+      if (iterator.hasNext()) {
+        currentKey = iterator.next();
+        return true;
+      } else {
+        currentKey = null;
+        return false;
+      }
+    }
+
+    T getCurrentKey() {
+      return currentKey;
+    }
+
+    @Override
+    public int compareTo(@Nonnull HeapEntryWithFileIdx<T> other) {
+      if (this.currentKey == null && other.currentKey == null) {
+        return 0;
+      }
+      if (this.currentKey == null) {
+        return 1;
+      }
+      if (other.currentKey == null) {
+        return -1;
+      }
+
+      int result = this.currentKey.compareTo(other.currentKey);
+      if (result == 0) {
+        // For identical keys, prefer the one from the file with the higher 
index
+        return Integer.compare(other.fileIndex, this.fileIndex);
+      }
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      }
+
+      HeapEntryWithFileIdx<T> other = (HeapEntryWithFileIdx<T>) obj;
+      return compareTo(other) == 0;
+    }
 
-    private final Iterator<String> fileNameIterator;
+    @Override
+    public int hashCode() {
+      return Objects.hash(iterator, currentKey, fileIndex);
+    }
+  }
 
-    private String currentFile;
-    private ClosableIterator<T> currentFileIterator;
+  private abstract static class MultipleSstFileIterator<T extends 
Comparable<T>> implements ClosableIterator<T> {
+    private final Collection<String> files;
+    private final PriorityQueue<HeapEntryWithFileIdx<T>> minHeap;
+    private final List<HeapEntryWithFileIdx<T>> allIterators;
 
     private MultipleSstFileIterator(Collection<String> files) {
-      this.fileNameIterator = files.iterator();
+      this.files = files;
+      this.minHeap = new PriorityQueue<>();
+      this.allIterators = new ArrayList<>();
       init();
+      initMinHeap();
     }
 
     protected abstract void init();
 
     protected abstract ClosableIterator<T> getKeyIteratorForFile(String file) 
throws RocksDBException, IOException;
 
-    @Override
-    public boolean hasNext() {
+    private void initMinHeap() {
       try {
-        do {
-          if (Objects.nonNull(currentFileIterator) && 
currentFileIterator.hasNext()) {
-            return true;
+        int fileIndex = 0;
+        for (String file : files) {
+          ClosableIterator<T> iterator = getKeyIteratorForFile(file);
+          HeapEntryWithFileIdx<T> entry = new HeapEntryWithFileIdx<>(iterator, 
fileIndex++);
+          allIterators.add(entry);
+
+          if (entry.getCurrentKey() != null) {
+            minHeap.offer(entry);
           }

Review Comment:
   Good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to