smengcl commented on code in PR #8912:
URL: https://github.com/apache/ozone/pull/8912#discussion_r2314482585


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java:
##########
@@ -233,70 +236,180 @@ public String next() {
     }
   }
 
-  private abstract static class MultipleSstFileIterator<T> implements 
ClosableIterator<T> {
+  /**
+   * A wrapper class that holds an iterator and its current value for heap 
operations.
+   */
+  private static class HeapEntryWithFileIdx<T extends Comparable<T>>
+      implements Comparable<HeapEntryWithFileIdx<T>> {
+    private final ClosableIterator<T> iterator;
+    private T currentKey;
+    // To ensure stable ordering for identical keys
+    private final int fileIndex;
+
+    HeapEntryWithFileIdx(ClosableIterator<T> iterator, int fileIndex) {
+      this.iterator = iterator;
+      this.fileIndex = fileIndex;
+      advance();
+    }
+
+    void close() {
+      iterator.close();
+    }
+
+    boolean advance() {
+      if (iterator.hasNext()) {
+        currentKey = iterator.next();
+        return true;
+      } else {
+        currentKey = null;
+        return false;
+      }
+    }
+
+    T getCurrentKey() {
+      return currentKey;
+    }
+
+    @Override
+    public int compareTo(@Nonnull HeapEntryWithFileIdx<T> other) {
+      if (this.currentKey == null && other.currentKey == null) {
+        return 0;
+      }
+      if (this.currentKey == null) {
+        return 1;
+      }
+      if (other.currentKey == null) {
+        return -1;
+      }
+
+      int result = this.currentKey.compareTo(other.currentKey);
+      if (result == 0) {
+        // For identical keys, prefer the one from the file with the higher 
index
+        return Integer.compare(other.fileIndex, this.fileIndex);
+      }
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      }
+
+      HeapEntryWithFileIdx<T> other = (HeapEntryWithFileIdx<T>) obj;
+
+      if (this.currentKey == null && other.currentKey == null) {
+        return this.fileIndex == other.fileIndex;
+      }
+      if (this.currentKey == null || other.currentKey == null) {
+        return false;
+      }
+
+      return this.currentKey.equals(other.currentKey) && this.fileIndex == 
other.fileIndex;
+    }
 
-    private final Iterator<String> fileNameIterator;
+    @Override
+    public int hashCode() {
+      return Objects.hash(iterator, currentKey, fileIndex);
+    }
+  }
 
-    private String currentFile;
-    private ClosableIterator<T> currentFileIterator;
+  private abstract static class MultipleSstFileIterator<T extends 
Comparable<T>> implements ClosableIterator<T> {
+    private final Collection<String> files;
+    private final PriorityQueue<HeapEntryWithFileIdx<T>> minHeap;
+    private final List<HeapEntryWithFileIdx<T>> allIterators;
+    private T lastReturnedValue;
 
     private MultipleSstFileIterator(Collection<String> files) {
-      this.fileNameIterator = files.iterator();
+      this.files = files;
+      this.minHeap = new PriorityQueue<>();
+      this.allIterators = new ArrayList<>();
+      this.lastReturnedValue = null;
       init();
+      initMinHeap();
     }
 
     protected abstract void init();
 
     protected abstract ClosableIterator<T> getKeyIteratorForFile(String file) 
throws RocksDBException, IOException;
 
-    @Override
-    public boolean hasNext() {
+    private void initMinHeap() {
       try {
-        do {
-          if (Objects.nonNull(currentFileIterator) && 
currentFileIterator.hasNext()) {
-            return true;
+        int fileIndex = 0;
+        for (String file : files) {
+          ClosableIterator<T> iterator = getKeyIteratorForFile(file);
+          HeapEntryWithFileIdx<T> entry = new HeapEntryWithFileIdx<>(iterator, 
fileIndex++);
+          allIterators.add(entry);
+
+          if (entry.getCurrentKey() != null) {
+            minHeap.offer(entry);
           }
-        } while (moveToNextFile());
+        }
       } catch (IOException | RocksDBException e) {
-        // TODO: [Snapshot] This exception has to be handled by the caller.
-        //  We have to do better exception handling.
-        throw new RuntimeException(e);
+        // Clean up any opened iterators
+        close();
+        throw new RuntimeException("Failed to initialize SST file iterators", 
e);
       }
-      return false;
     }
 
     @Override
-    public T next() {
-      if (hasNext()) {
-        return currentFileIterator.next();
+    public boolean hasNext() {
+      // Skip duplicates, keep advancing until we find a different key or run 
out of entries
+      while (!minHeap.isEmpty()) {

Review Comment:
   Yup that's better. Let me make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to