fonsdant commented on code in PR #18610:
URL: https://github.com/apache/kafka/pull/18610#discussion_r2017793076


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
         }
     }
 
-    private class RocksDBDualCFIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>>
-        implements ManagedKeyValueIterator<Bytes, byte[]> {
-
-        // RocksDB's JNI interface does not expose getters/setters that allow 
the
-        // comparator to be pluggable, and the default is lexicographic, so 
it's
-        // safe to just force lexicographic comparator here for now.
+    /**
+     * A range-based iterator for RocksDB that merges results from two column 
families.
+     *
+     * <p>This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+     * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+     * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+     * boundaries.</p>
+     *
+     * <h2>Key Features</h2>
+     *
+     * <ul>
+     *     <li>Merges results from the "with-timestamp" and "no-timestamp" 
column families.</li>
+     *     <li>Supports range-based queries with open or closed 
boundaries.</li>
+     *     <li>Handles both forward and reverse iteration seamlessly.</li>
+     *     <li>Ensures correct handling of inclusive and exclusive upper 
boundaries.</li>
+     *     <li>Integrates efficiently with Kafka Streams state store 
mechanisms.</li>
+     * </ul>
+     *
+     * <h2>Usage</h2>
+     *
+     * <p>The iterator can be used for different types of range-based 
operations, such as:
+     * <ul>
+     *     <li>Iterating over all keys within a range.</li>
+     *     <li>Prefix-based scans (when combined with dynamically calculated 
range endpoints).</li>
+     *     <li>Open-ended range queries (e.g., from a given key to the end of 
the dataset).</li>
+     * </ul>
+     * </p>
+     *
+     * <h2>Implementation Details</h2>
+     *
+     * <p>The class extends {@link AbstractIterator} and implements {@link 
ManagedKeyValueIterator}. It uses RocksDB's
+     * native iterators for efficient traversal of keys within the specified 
range. Keys from the two column families
+     * are merged during iteration, ensuring proper order and de-duplication 
where applicable.</p>
+     *
+     * <h3>Key Methods:</h3>
+     *
+     * <ul>
+     *     <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in 
the merged range, ensuring
+     *     the result is within the specified range and boundary 
conditions.</li>
+     *     <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB 
iterators based on the specified range and direction.</li>
+     *     <li><b>{@code isInRange()}:</b> Verifies if the current key-value 
pair is within the range defined by {@code from} and {@code to}.</li>
+     *     <li><b>{@code fetchNextKeyValue()}:</b> Determines the next 
key-value pair to return based on the state of both iterators.</li>
+     * </ul>
+     *
+     * <h3>Thread Safety:</h3>
+     *
+     * <p>The iterator is thread-safe for sequential operations but should not 
be accessed concurrently from multiple
+     * threads without external synchronization.</p>

Review Comment:
   Updated.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -277,197 +275,361 @@ public void close() {
         }
     }
 
-    private class RocksDBDualCFIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>>
-        implements ManagedKeyValueIterator<Bytes, byte[]> {
-
-        // RocksDB's JNI interface does not expose getters/setters that allow 
the
-        // comparator to be pluggable, and the default is lexicographic, so 
it's
-        // safe to just force lexicographic comparator here for now.
+    /**
+     * A range-based iterator for RocksDB that merges results from two column 
families.
+     *
+     * <p>This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+     * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+     * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+     * boundaries.</p>
+     *
+     * <h2>Key Features</h2>
+     *
+     * <ul>
+     *     <li>Merges results from the "with-timestamp" and "no-timestamp" 
column families.</li>
+     *     <li>Supports range-based queries with open or closed 
boundaries.</li>
+     *     <li>Handles both forward and reverse iteration seamlessly.</li>
+     *     <li>Ensures correct handling of inclusive and exclusive upper 
boundaries.</li>
+     *     <li>Integrates efficiently with Kafka Streams state store 
mechanisms.</li>
+     * </ul>
+     *
+     * <h2>Usage</h2>
+     *
+     * <p>The iterator can be used for different types of range-based 
operations, such as:
+     * <ul>
+     *     <li>Iterating over all keys within a range.</li>
+     *     <li>Prefix-based scans (when combined with dynamically calculated 
range endpoints).</li>
+     *     <li>Open-ended range queries (e.g., from a given key to the end of 
the dataset).</li>
+     * </ul>
+     * </p>
+     *
+     * <h2>Implementation Details</h2>
+     *
+     * <p>The class extends {@link AbstractIterator} and implements {@link 
ManagedKeyValueIterator}. It uses RocksDB's
+     * native iterators for efficient traversal of keys within the specified 
range. Keys from the two column families
+     * are merged during iteration, ensuring proper order and de-duplication 
where applicable.</p>
+     *
+     * <h3>Key Methods:</h3>
+     *
+     * <ul>
+     *     <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in 
the merged range, ensuring
+     *     the result is within the specified range and boundary 
conditions.</li>
+     *     <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB 
iterators based on the specified range and direction.</li>
+     *     <li><b>{@code isInRange()}:</b> Verifies if the current key-value 
pair is within the range defined by {@code from} and {@code to}.</li>
+     *     <li><b>{@code fetchNextKeyValue()}:</b> Determines the next 
key-value pair to return based on the state of both iterators.</li>
+     * </ul>
+     *
+     * <h3>Thread Safety:</h3>
+     *
+     * <p>The iterator is thread-safe for sequential operations but should not 
be accessed concurrently from multiple
+     * threads without external synchronization.</p>
+     *
+     * <h2>Examples</h2>
+     *
+     * <h3>Iterate over a range:</h3>
+     *
+     * <pre>{@code
+     * RocksIterator noTimestampIterator = 
accessor.newIterator(noTimestampColumnFamily);
+     * RocksIterator withTimestampIterator = 
accessor.newIterator(withTimestampColumnFamily);
+     *
+     * try (RocksDBDualCFRangeIterator iterator = new 
RocksDBDualCFRangeIterator(
+     *         new Bytes("keyStart".getBytes()),
+     *         new Bytes("keyEnd".getBytes()),
+     *         noTimestampIterator,
+     *         withTimestampIterator,
+     *         "storeName",
+     *         true,  // Forward iteration
+     *         true   // Inclusive upper boundary
+     * )) {
+     *     while (iterator.hasNext()) {
+     *         KeyValue<Bytes, byte[]> entry = iterator.next();
+     *         System.out.println("Key: " + entry.key + ", Value: " + 
Arrays.toString(entry.value));
+     *     }
+     * }
+     * }</pre>
+     *
+     * <h2>Exceptions</h2>
+     *
+     * <ul>
+     *     <li><b>{@link InvalidStateStoreException}:</b> Thrown if the 
iterator is accessed after being closed.</li>
+     *     <li><b>{@link IllegalStateException}:</b> Thrown if the close 
callback is not properly set before usage.</li>
+     * </ul>
+     *
+     * @see AbstractIterator
+     * @see ManagedKeyValueIterator
+     * @see RocksDBStore
+     */
+    private static class RocksDBDualCFRangeIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>> implements 
ManagedKeyValueIterator<Bytes, byte[]> {
+        private Runnable closeCallback;
+        private byte[] noTimestampNext;
+        private byte[] withTimestampNext;
         private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-
+        private final RocksIterator noTimestampIterator;
+        private final RocksIterator withTimestampIterator;
         private final String storeName;
-        private final RocksIterator iterWithTimestamp;
-        private final RocksIterator iterNoTimestamp;
         private final boolean forward;
-
+        private final boolean toInclusive;
+        private final byte[] rawLastKey;
         private volatile boolean open = true;
 
-        private byte[] nextWithTimestamp;
-        private byte[] nextNoTimestamp;
-        private KeyValue<Bytes, byte[]> next;
-        private Runnable closeCallback = null;
-
-        RocksDBDualCFIterator(final String storeName,
-                              final RocksIterator iterWithTimestamp,
-                              final RocksIterator iterNoTimestamp,
-                              final boolean forward) {
-            this.iterWithTimestamp = iterWithTimestamp;
-            this.iterNoTimestamp = iterNoTimestamp;
-            this.storeName = storeName;
+        /**
+         * Constructs a new {@code RocksDBDualCFRangeIterator}.
+         *
+         * <p>Initializes the RocksDB iterators for two column families 
(timestamped and non-timestamped) and sets up
+         * the range and direction for iteration.</p>
+         *
+         * @param from                  The starting key of the range. Can be 
{@code null} for an open range.
+         * @param to                    The ending key of the range. Can be 
{@code null} for an open range.
+         * @param noTimestampIterator   The iterator for the non-timestamped 
column family.
+         * @param withTimestampIterator The iterator for the timestamped 
column family.
+         * @param storeName             The name of the store associated with 
this iterator.
+         * @param forward               {@code true} for forward iteration; 
{@code false} for reverse iteration.
+         * @param toInclusive           Whether the upper boundary of the 
range is inclusive.
+         */
+        RocksDBDualCFRangeIterator(final Bytes from,
+                                   final Bytes to,
+                                   final RocksIterator noTimestampIterator,
+                                   final RocksIterator withTimestampIterator,
+                                   final String storeName,
+                                   final boolean forward,
+                                   final boolean toInclusive) {
             this.forward = forward;
+            this.noTimestampIterator = noTimestampIterator;
+            this.storeName = storeName;
+            this.toInclusive = toInclusive;
+            this.withTimestampIterator = withTimestampIterator;
+
+            this.rawLastKey = initializeIterators(from, to);
         }
 
+        /**
+         * Retrieves the next key-value pair in the range.
+         *
+         * <p>This method determines the next key-value pair to return by 
merging the results from the two column
+         * families. If both column families have keys, it selects the one 
that matches the iteration order and range
+         * conditions. Keys outside the specified range are skipped.</p>
+         *
+         * @return The next {@link KeyValue} pair in the range, or {@code 
null} if no more elements are available.
+         */
         @Override
-        public synchronized boolean hasNext() {
-            if (!open) {
-                throw new InvalidStateStoreException(String.format("RocksDB 
iterator for store %s has closed", storeName));
-            }
-            return super.hasNext();
+        protected KeyValue<Bytes, byte[]> makeNext() {
+            loadNextKeys();
+            if (noTimestampNext == null && withTimestampNext == null) return 
allDone();
+            final KeyValue<Bytes, byte[]> next = fetchNextKeyValue();
+            return isInRange(next) ? next : allDone();
+        }
+
+        /**
+         * Returns the next key in the range without advancing the iterator.
+         *
+         * <p>This method retrieves the key of the next {@link KeyValue} pair 
that would be returned by {@link #next()},
+         * without moving the iterator forward. This is useful for inspecting 
the next key without affecting the
+         * iterator's state.</p>
+         *
+         * @return The next key as a {@link Bytes} object.
+         *
+         * @throws NoSuchElementException If there are no more elements in the 
iterator.
+         */
+        @Override
+        public Bytes peekNextKey() {

Review Comment:
   In progress.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to