Copilot commented on code in PR #8912: URL: https://github.com/apache/ozone/pull/8912#discussion_r2346029798
########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java: ########## @@ -233,69 +235,145 @@ public String next() { } } - private abstract static class MultipleSstFileIterator<T> implements ClosableIterator<T> { + /** + * A wrapper class that holds an iterator and its current value for heap operations. + */ + private static class HeapEntry<T extends Comparable<T>> + implements Comparable<HeapEntry<T>>, Closeable { + private final ClosableIterator<T> iterator; + private T currentKey; + + HeapEntry(ClosableIterator<T> iterator) { + this.iterator = iterator; + advance(); + } + + @Override + public void close() { + iterator.close(); + } + + boolean advance() { + if (iterator.hasNext()) { + currentKey = iterator.next(); + return true; + } else { + currentKey = null; + return false; + } + } + + T getCurrentKey() { + return currentKey; + } + + @Override + public int compareTo(@Nonnull HeapEntry<T> other) { + if (this.currentKey == null && other.currentKey == null) { + return 0; + } + if (this.currentKey == null) { + return 1; + } + if (other.currentKey == null) { + return -1; + } + return this.currentKey.compareTo(other.currentKey); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } - private final Iterator<String> fileNameIterator; + HeapEntry<T> other = (HeapEntry<T>) obj; + return compareTo(other) == 0; + } - private String currentFile; - private ClosableIterator<T> currentFileIterator; + @Override + public int hashCode() { + return Objects.hash(iterator, currentKey); + } + } - private MultipleSstFileIterator(Collection<String> files) { - this.fileNameIterator = files.iterator(); + /** + * The MultipleSstFileIterator class is an abstract base for iterating over multiple SST files. + * It uses a PriorityQueue to merge keys from all files in sorted order. + * Each file's iterator is wrapped in a HeapEntryWithFileIdx object, + * which ensures stable ordering for identical keys by considering the file index. + * @param <T> + */ + private abstract static class MultipleSstFileIterator<T extends Comparable<T>> implements ClosableIterator<T> { + private final PriorityQueue<HeapEntry<T>> minHeap; + + private MultipleSstFileIterator(Collection<String> sstFiles) { + this.minHeap = new PriorityQueue<>(); init(); + initMinHeap(sstFiles); } protected abstract void init(); protected abstract ClosableIterator<T> getKeyIteratorForFile(String file) throws RocksDBException, IOException; - @Override - public boolean hasNext() { + private void initMinHeap(Collection<String> files) { try { - do { - if (Objects.nonNull(currentFileIterator) && currentFileIterator.hasNext()) { - return true; + for (String file : files) { + ClosableIterator<T> iterator = getKeyIteratorForFile(file); + HeapEntry<T> entry = new HeapEntry<>(iterator); + + if (entry.getCurrentKey() != null) { + minHeap.offer(entry); + } else { + // No valid entries, close the iterator + entry.close(); } - } while (moveToNextFile()); + } } catch (IOException | RocksDBException e) { - // TODO: [Snapshot] This exception has to be handled by the caller. - // We have to do better exception handling. - throw new RuntimeException(e); + // Clean up any opened iterators + close(); + throw new RuntimeException("Failed to initialize SST file iterators", e); } - return false; } @Override - public T next() { - if (hasNext()) { - return currentFileIterator.next(); - } - throw new NoSuchElementException("No more elements found."); + public boolean hasNext() { + return !minHeap.isEmpty(); } @Override - public void close() throws UncheckedIOException { - try { - closeCurrentFile(); - } catch (IOException e) { - throw new UncheckedIOException(e); + public T next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements found."); } - } - private boolean moveToNextFile() throws IOException, RocksDBException { - if (fileNameIterator.hasNext()) { - closeCurrentFile(); - currentFile = fileNameIterator.next(); - this.currentFileIterator = getKeyIteratorForFile(currentFile); - return true; + assert minHeap.peek() != null; + // Get current key from heap + HeapEntry<T> currentKey = minHeap.peek(); + + // Advance all entries with the same key (from different files) + // and keep the one with the highest file index + while (!minHeap.isEmpty() && Objects.equals(minHeap.peek(), currentKey)) { Review Comment: The comment mentions keeping 'the one with the highest file index' but the code doesn't implement this logic. The current implementation advances all entries with the same key but doesn't actually select based on file index. There's no mechanism to track or compare file indices in the HeapEntry class. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java: ########## @@ -233,69 +235,145 @@ public String next() { } } - private abstract static class MultipleSstFileIterator<T> implements ClosableIterator<T> { + /** + * A wrapper class that holds an iterator and its current value for heap operations. + */ + private static class HeapEntry<T extends Comparable<T>> + implements Comparable<HeapEntry<T>>, Closeable { + private final ClosableIterator<T> iterator; + private T currentKey; + + HeapEntry(ClosableIterator<T> iterator) { + this.iterator = iterator; + advance(); + } + + @Override + public void close() { + iterator.close(); + } + + boolean advance() { + if (iterator.hasNext()) { + currentKey = iterator.next(); + return true; + } else { + currentKey = null; + return false; + } + } + + T getCurrentKey() { + return currentKey; + } + + @Override + public int compareTo(@Nonnull HeapEntry<T> other) { + if (this.currentKey == null && other.currentKey == null) { + return 0; + } + if (this.currentKey == null) { + return 1; + } + if (other.currentKey == null) { + return -1; + } + return this.currentKey.compareTo(other.currentKey); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } - private final Iterator<String> fileNameIterator; + HeapEntry<T> other = (HeapEntry<T>) obj; Review Comment: Unchecked cast should be avoided. Add a type check before casting or use @SuppressWarnings(\"unchecked\") if the cast is guaranteed to be safe. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdb/util/TestSstFileSetReader.java: ########## @@ -87,14 +98,32 @@ private String createRandomSSTFile(TreeMap<String, Integer> keys) return file.getAbsolutePath(); } + /** + * Helper method to create a map of keys with values alternating between 0 and 1. + * Keys with even indices get value 0 (will be treated as deletions/tombstones), + * keys with odd indices get value 1 (will be treated as regular entries). + * + * @param startRange Starting range for key generation (inclusive) + * @param endRange Ending range for key generation (exclusive) + * @return Map of keys with alternating 0/1 values + */ private Map<String, Integer> createKeys(int startRange, int endRange) { return IntStream.range(startRange, endRange).boxed() .collect(Collectors.toMap(i -> KEY_PREFIX + i, i -> i % 2)); } - private Pair<SortedMap<String, Integer>, List<String>> createDummyData( - int numberOfFiles) throws RocksDBException, IOException { + /** + * Helper method to create dummy test data consisting of multiple SST files. + * Keys are distributed across files in round-robin fashion, ensuring each file + * contains a subset of the total key space for testing overlapping scenarios. + * + * @param numberOfFiles Number of SST files to create + * @return Pair containing the complete sorted key map and list of SST file paths + * @throws RocksDBException if there's an error during SST file creation + * @throws IOException if there's an I/O error during file operations Review Comment: The method signature was changed to remove IOException from the throws clause, but the JavaDoc still mentions it can throw IOException. The JavaDoc should be updated to reflect the actual exceptions thrown. ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org