smengcl commented on code in PR #8912: URL: https://github.com/apache/ozone/pull/8912#discussion_r2331514624
########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java: ########## @@ -233,70 +236,149 @@ public String next() { } } - private abstract static class MultipleSstFileIterator<T> implements ClosableIterator<T> { + /** + * A wrapper class that holds an iterator and its current value for heap operations. + */ + private static class HeapEntryWithFileIdx<T extends Comparable<T>> + implements Comparable<HeapEntryWithFileIdx<T>> { + private final ClosableIterator<T> iterator; + private T currentKey; + // To ensure stable ordering for identical keys + private final int fileIndex; + + HeapEntryWithFileIdx(ClosableIterator<T> iterator, int fileIndex) { + this.iterator = iterator; + this.fileIndex = fileIndex; + advance(); + } + + void close() { + iterator.close(); + } + + boolean advance() { + if (iterator.hasNext()) { + currentKey = iterator.next(); + return true; + } else { + currentKey = null; + return false; + } + } + + T getCurrentKey() { + return currentKey; + } + + @Override + public int compareTo(@Nonnull HeapEntryWithFileIdx<T> other) { + if (this.currentKey == null && other.currentKey == null) { + return 0; + } + if (this.currentKey == null) { + return 1; + } + if (other.currentKey == null) { + return -1; + } + + int result = this.currentKey.compareTo(other.currentKey); + if (result == 0) { + // For identical keys, prefer the one from the file with the higher index + return Integer.compare(other.fileIndex, this.fileIndex); + } + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + HeapEntryWithFileIdx<T> other = (HeapEntryWithFileIdx<T>) obj; + return compareTo(other) == 0; + } - private final Iterator<String> fileNameIterator; + @Override + public int hashCode() { + return Objects.hash(iterator, currentKey, fileIndex); + } + } - private String currentFile; - private ClosableIterator<T> currentFileIterator; + private abstract static class MultipleSstFileIterator<T extends Comparable<T>> implements ClosableIterator<T> { + private final Collection<String> files; + private final PriorityQueue<HeapEntryWithFileIdx<T>> minHeap; + private final List<HeapEntryWithFileIdx<T>> allIterators; private MultipleSstFileIterator(Collection<String> files) { - this.fileNameIterator = files.iterator(); + this.files = files; + this.minHeap = new PriorityQueue<>(); + this.allIterators = new ArrayList<>(); init(); + initMinHeap(); } protected abstract void init(); protected abstract ClosableIterator<T> getKeyIteratorForFile(String file) throws RocksDBException, IOException; - @Override - public boolean hasNext() { + private void initMinHeap() { try { - do { - if (Objects.nonNull(currentFileIterator) && currentFileIterator.hasNext()) { - return true; + int fileIndex = 0; + for (String file : files) { + ClosableIterator<T> iterator = getKeyIteratorForFile(file); + HeapEntryWithFileIdx<T> entry = new HeapEntryWithFileIdx<>(iterator, fileIndex++); + allIterators.add(entry); + + if (entry.getCurrentKey() != null) { + minHeap.offer(entry); } Review Comment: Good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org