fonsdant commented on code in PR #18610: URL: https://github.com/apache/kafka/pull/18610#discussion_r2017793076
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ########## @@ -277,197 +275,361 @@ public void close() { } } - private class RocksDBDualCFIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> - implements ManagedKeyValueIterator<Bytes, byte[]> { - - // RocksDB's JNI interface does not expose getters/setters that allow the - // comparator to be pluggable, and the default is lexicographic, so it's - // safe to just force lexicographic comparator here for now. + /** + * A range-based iterator for RocksDB that merges results from two column families. + * + * <p>This iterator supports traversal over two RocksDB column families: one containing timestamped values and + * another containing non-timestamped values. It ensures that the keys from both column families are merged and + * sorted lexicographically, respecting the iteration order (forward or reverse) and the specified range + * boundaries.</p> + * + * <h2>Key Features</h2> + * + * <ul> + * <li>Merges results from the "with-timestamp" and "no-timestamp" column families.</li> + * <li>Supports range-based queries with open or closed boundaries.</li> + * <li>Handles both forward and reverse iteration seamlessly.</li> + * <li>Ensures correct handling of inclusive and exclusive upper boundaries.</li> + * <li>Integrates efficiently with Kafka Streams state store mechanisms.</li> + * </ul> + * + * <h2>Usage</h2> + * + * <p>The iterator can be used for different types of range-based operations, such as: + * <ul> + * <li>Iterating over all keys within a range.</li> + * <li>Prefix-based scans (when combined with dynamically calculated range endpoints).</li> + * <li>Open-ended range queries (e.g., from a given key to the end of the dataset).</li> + * </ul> + * </p> + * + * <h2>Implementation Details</h2> + * + * <p>The class extends {@link AbstractIterator} and implements {@link ManagedKeyValueIterator}. It uses RocksDB's + * native iterators for efficient traversal of keys within the specified range. Keys from the two column families + * are merged during iteration, ensuring proper order and de-duplication where applicable.</p> + * + * <h3>Key Methods:</h3> + * + * <ul> + * <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in the merged range, ensuring + * the result is within the specified range and boundary conditions.</li> + * <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB iterators based on the specified range and direction.</li> + * <li><b>{@code isInRange()}:</b> Verifies if the current key-value pair is within the range defined by {@code from} and {@code to}.</li> + * <li><b>{@code fetchNextKeyValue()}:</b> Determines the next key-value pair to return based on the state of both iterators.</li> + * </ul> + * + * <h3>Thread Safety:</h3> + * + * <p>The iterator is thread-safe for sequential operations but should not be accessed concurrently from multiple + * threads without external synchronization.</p> Review Comment: Updated. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ########## @@ -277,197 +275,361 @@ public void close() { } } - private class RocksDBDualCFIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> - implements ManagedKeyValueIterator<Bytes, byte[]> { - - // RocksDB's JNI interface does not expose getters/setters that allow the - // comparator to be pluggable, and the default is lexicographic, so it's - // safe to just force lexicographic comparator here for now. + /** + * A range-based iterator for RocksDB that merges results from two column families. + * + * <p>This iterator supports traversal over two RocksDB column families: one containing timestamped values and + * another containing non-timestamped values. It ensures that the keys from both column families are merged and + * sorted lexicographically, respecting the iteration order (forward or reverse) and the specified range + * boundaries.</p> + * + * <h2>Key Features</h2> + * + * <ul> + * <li>Merges results from the "with-timestamp" and "no-timestamp" column families.</li> + * <li>Supports range-based queries with open or closed boundaries.</li> + * <li>Handles both forward and reverse iteration seamlessly.</li> + * <li>Ensures correct handling of inclusive and exclusive upper boundaries.</li> + * <li>Integrates efficiently with Kafka Streams state store mechanisms.</li> + * </ul> + * + * <h2>Usage</h2> + * + * <p>The iterator can be used for different types of range-based operations, such as: + * <ul> + * <li>Iterating over all keys within a range.</li> + * <li>Prefix-based scans (when combined with dynamically calculated range endpoints).</li> + * <li>Open-ended range queries (e.g., from a given key to the end of the dataset).</li> + * </ul> + * </p> + * + * <h2>Implementation Details</h2> + * + * <p>The class extends {@link AbstractIterator} and implements {@link ManagedKeyValueIterator}. It uses RocksDB's + * native iterators for efficient traversal of keys within the specified range. Keys from the two column families + * are merged during iteration, ensuring proper order and de-duplication where applicable.</p> + * + * <h3>Key Methods:</h3> + * + * <ul> + * <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in the merged range, ensuring + * the result is within the specified range and boundary conditions.</li> + * <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB iterators based on the specified range and direction.</li> + * <li><b>{@code isInRange()}:</b> Verifies if the current key-value pair is within the range defined by {@code from} and {@code to}.</li> + * <li><b>{@code fetchNextKeyValue()}:</b> Determines the next key-value pair to return based on the state of both iterators.</li> + * </ul> + * + * <h3>Thread Safety:</h3> + * + * <p>The iterator is thread-safe for sequential operations but should not be accessed concurrently from multiple + * threads without external synchronization.</p> + * + * <h2>Examples</h2> + * + * <h3>Iterate over a range:</h3> + * + * <pre>{@code + * RocksIterator noTimestampIterator = accessor.newIterator(noTimestampColumnFamily); + * RocksIterator withTimestampIterator = accessor.newIterator(withTimestampColumnFamily); + * + * try (RocksDBDualCFRangeIterator iterator = new RocksDBDualCFRangeIterator( + * new Bytes("keyStart".getBytes()), + * new Bytes("keyEnd".getBytes()), + * noTimestampIterator, + * withTimestampIterator, + * "storeName", + * true, // Forward iteration + * true // Inclusive upper boundary + * )) { + * while (iterator.hasNext()) { + * KeyValue<Bytes, byte[]> entry = iterator.next(); + * System.out.println("Key: " + entry.key + ", Value: " + Arrays.toString(entry.value)); + * } + * } + * }</pre> + * + * <h2>Exceptions</h2> + * + * <ul> + * <li><b>{@link InvalidStateStoreException}:</b> Thrown if the iterator is accessed after being closed.</li> + * <li><b>{@link IllegalStateException}:</b> Thrown if the close callback is not properly set before usage.</li> + * </ul> + * + * @see AbstractIterator + * @see ManagedKeyValueIterator + * @see RocksDBStore + */ + private static class RocksDBDualCFRangeIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> implements ManagedKeyValueIterator<Bytes, byte[]> { + private Runnable closeCallback; + private byte[] noTimestampNext; + private byte[] withTimestampNext; private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR; - + private final RocksIterator noTimestampIterator; + private final RocksIterator withTimestampIterator; private final String storeName; - private final RocksIterator iterWithTimestamp; - private final RocksIterator iterNoTimestamp; private final boolean forward; - + private final boolean toInclusive; + private final byte[] rawLastKey; private volatile boolean open = true; - private byte[] nextWithTimestamp; - private byte[] nextNoTimestamp; - private KeyValue<Bytes, byte[]> next; - private Runnable closeCallback = null; - - RocksDBDualCFIterator(final String storeName, - final RocksIterator iterWithTimestamp, - final RocksIterator iterNoTimestamp, - final boolean forward) { - this.iterWithTimestamp = iterWithTimestamp; - this.iterNoTimestamp = iterNoTimestamp; - this.storeName = storeName; + /** + * Constructs a new {@code RocksDBDualCFRangeIterator}. + * + * <p>Initializes the RocksDB iterators for two column families (timestamped and non-timestamped) and sets up + * the range and direction for iteration.</p> + * + * @param from The starting key of the range. Can be {@code null} for an open range. + * @param to The ending key of the range. Can be {@code null} for an open range. + * @param noTimestampIterator The iterator for the non-timestamped column family. + * @param withTimestampIterator The iterator for the timestamped column family. + * @param storeName The name of the store associated with this iterator. + * @param forward {@code true} for forward iteration; {@code false} for reverse iteration. + * @param toInclusive Whether the upper boundary of the range is inclusive. + */ + RocksDBDualCFRangeIterator(final Bytes from, + final Bytes to, + final RocksIterator noTimestampIterator, + final RocksIterator withTimestampIterator, + final String storeName, + final boolean forward, + final boolean toInclusive) { this.forward = forward; + this.noTimestampIterator = noTimestampIterator; + this.storeName = storeName; + this.toInclusive = toInclusive; + this.withTimestampIterator = withTimestampIterator; + + this.rawLastKey = initializeIterators(from, to); } + /** + * Retrieves the next key-value pair in the range. + * + * <p>This method determines the next key-value pair to return by merging the results from the two column + * families. If both column families have keys, it selects the one that matches the iteration order and range + * conditions. Keys outside the specified range are skipped.</p> + * + * @return The next {@link KeyValue} pair in the range, or {@code null} if no more elements are available. + */ @Override - public synchronized boolean hasNext() { - if (!open) { - throw new InvalidStateStoreException(String.format("RocksDB iterator for store %s has closed", storeName)); - } - return super.hasNext(); + protected KeyValue<Bytes, byte[]> makeNext() { + loadNextKeys(); + if (noTimestampNext == null && withTimestampNext == null) return allDone(); + final KeyValue<Bytes, byte[]> next = fetchNextKeyValue(); + return isInRange(next) ? next : allDone(); + } + + /** + * Returns the next key in the range without advancing the iterator. + * + * <p>This method retrieves the key of the next {@link KeyValue} pair that would be returned by {@link #next()}, + * without moving the iterator forward. This is useful for inspecting the next key without affecting the + * iterator's state.</p> + * + * @return The next key as a {@link Bytes} object. + * + * @throws NoSuchElementException If there are no more elements in the iterator. + */ + @Override + public Bytes peekNextKey() { Review Comment: In progress. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org