ableegoldman commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r468843465



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
##########
@@ -58,7 +61,8 @@ public synchronized boolean hasNext() {
             return allDone();
         } else {
             next = getKeyValue();
-            iter.next();
+            if (reverse) iter.prev();

Review comment:
       separate lines 🙂 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -29,32 +29,41 @@
     // comparator to be pluggable, and the default is lexicographic, so it's
     // safe to just force lexicographic comparator here for now.
     private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-    private final byte[] rawToKey;
+    private final byte[] rawLastKey;
+    private final boolean reverse;
 
     RocksDBRangeIterator(final String storeName,
                          final RocksIterator iter,
                          final Set<KeyValueIterator<Bytes, byte[]>> 
openIterators,
                          final Bytes from,
-                         final Bytes to) {
-        super(storeName, iter, openIterators);
-        iter.seek(from.get());
-        rawToKey = to.get();
-        if (rawToKey == null) {
+                         final Bytes to,
+                         final boolean reverse) {
+        super(storeName, iter, openIterators, reverse);
+        this.reverse = reverse;
+        if (reverse) {
+            iter.seekForPrev(to.get());
+            rawLastKey = from.get();
+        } else {
+            iter.seek(from.get());
+            rawLastKey = to.get();
+        }
+        if (rawLastKey == null) {
             throw new NullPointerException("RocksDBRangeIterator: RawToKey is 
null for key " + to);

Review comment:
       nit: `RawToKey` --> `RawLastKey`

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -294,13 +305,27 @@ public void shouldIterateOverRange() {
         assertEquals(items, results.size());
     }
 
+    @Test
+    public void shouldReverseIterateOverRange() {
+        final int items = addItemsToCache();
+        final KeyValueIterator<Bytes, byte[]> range =
+            store.reverseRange(bytesKey(String.valueOf(0)), 
bytesKey(String.valueOf(items)));
+        final List<Bytes> results = new ArrayList<>();
+        while (range.hasNext()) {
+            results.add(range.next().key);
+        }
+        assertEquals(items, results.size());

Review comment:
       Can we add some tests that verify the actual contents + order of the 
reverse range? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -150,8 +166,9 @@ public void close() {
     private class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, 
byte[]> {
         private final Iterator<Bytes> iter;
 
-        private InMemoryKeyValueIterator(final Set<Bytes> keySet) {
-            this.iter = new TreeSet<>(keySet).iterator();
+        private InMemoryKeyValueIterator(final Set<Bytes> keySet, final 
boolean reverse) {
+            if (reverse) this.iter = new 
TreeSet<>(keySet).descendingIterator();
+            else this.iter = new TreeSet<>(keySet).iterator();

Review comment:
       nit: Use braces & separate lines

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##########
@@ -110,7 +111,15 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> 
entries) {
 
     @Override
     public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes 
from, final Bytes to) {
+        return range(from, to, false);
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from, 
final Bytes to) {
+        return range(from, to, true);
+    }
 
+    KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to, 
final boolean reverse) {

Review comment:
       Should be private

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
##########
@@ -38,35 +38,68 @@
      *
      * @param key The key to fetch
      * @return The value or null if no value is found.
-     * @throws NullPointerException If null is used for key.
+     * @throws NullPointerException       If null is used for key.
      * @throws InvalidStateStoreException if the store is not initialized
      */
     V get(K key);
 
     /**
      * Get an iterator over a given range of keys. This iterator must be 
closed after use.
      * The returned iterator must be safe from {@link 
java.util.ConcurrentModificationException}s
-     * and must not return null values. No ordering guarantees are provided.
-     * @param from The first key that could be in the range
-     * @param to The last key that could be in the range
-     * @return The iterator for this range.
-     * @throws NullPointerException If null is used for from or to.
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not 
represent key order.
+     *
+     * @param from The first key that could be in the range, where iteration 
starts from.
+     * @param to   The last key that could be in the range, where iteration 
ends.
+     * @return The iterator for this range, from smallest to largest bytes.
+     * @throws NullPointerException       If null is used for from or to.
      * @throws InvalidStateStoreException if the store is not initialized
      */
     KeyValueIterator<K, V> range(K from, K to);
 
+    /**
+     * Get a reverse iterator over a given range of keys. This iterator must 
be closed after use.
+     * The returned iterator must be safe from {@link 
java.util.ConcurrentModificationException}s
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not 
represent key order.
+     *
+     * @param from The first key that could be in the range, where iteration 
ends.
+     * @param to   The last key that could be in the range, where iteration 
starts from.

Review comment:
       Seems a bit tricky to say that _to_ is the variable where iteration 
starts _from_ 😉  But I can see it both ways, so being clear in the javadocs is 
good enough for me

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
##########
@@ -281,15 +281,24 @@ public boolean isEmpty() {
     }
 
     synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
-        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, 
true));
+        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, 
true), false);
     }
 
-    private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) {
-        return new TreeSet<>(keySet).iterator();
+    synchronized Iterator<Bytes> reverseKeyRange(final Bytes from, final Bytes 
to) {
+        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, 
true), true);
+    }
+
+    private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet, final 
boolean reverse) {
+        if (reverse) return new TreeSet<>(keySet).descendingIterator();

Review comment:
       nit: braces + separate lines

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -29,32 +29,41 @@
     // comparator to be pluggable, and the default is lexicographic, so it's
     // safe to just force lexicographic comparator here for now.
     private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-    private final byte[] rawToKey;
+    private final byte[] rawLastKey;
+    private final boolean reverse;
 
     RocksDBRangeIterator(final String storeName,
                          final RocksIterator iter,
                          final Set<KeyValueIterator<Bytes, byte[]>> 
openIterators,
                          final Bytes from,
-                         final Bytes to) {
-        super(storeName, iter, openIterators);
-        iter.seek(from.get());
-        rawToKey = to.get();
-        if (rawToKey == null) {
+                         final Bytes to,
+                         final boolean reverse) {
+        super(storeName, iter, openIterators, reverse);
+        this.reverse = reverse;
+        if (reverse) {
+            iter.seekForPrev(to.get());
+            rawLastKey = from.get();
+        } else {
+            iter.seek(from.get());
+            rawLastKey = to.get();
+        }
+        if (rawLastKey == null) {
             throw new NullPointerException("RocksDBRangeIterator: RawToKey is 
null for key " + to);
         }
     }
 
     @Override
     public KeyValue<Bytes, byte[]> makeNext() {
         final KeyValue<Bytes, byte[]> next = super.makeNext();
-
         if (next == null) {
             return allDone();
         } else {
-            if (comparator.compare(next.key.get(), rawToKey) <= 0) {
-                return next;
+            if (!reverse) {
+                if (comparator.compare(next.key.get(), rawLastKey) <= 0) 
return next;
+                else return allDone();

Review comment:
       nit: braces & separate lines

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##########
@@ -422,6 +503,21 @@ public void 
shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
                     " Note that the built-in numerical serdes do not follow 
this for negative numbers")
             );
         }
+    }
+
+    @Test
+    public void 
shouldNotThrowInvalidReverseRangeExceptionWithNegativeFromKey() {

Review comment:
       Can we add tests for some other invalid range cases? For example with 
both bounds positive but from > to

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -306,26 +313,29 @@ public synchronized boolean hasNext() {
                 } else {
                     next = KeyValue.pair(new Bytes(nextWithTimestamp), 
iterWithTimestamp.value());
                     nextWithTimestamp = null;
-                    iterWithTimestamp.next();
+                    if (reverse) iterWithTimestamp.prev();

Review comment:
       braces & separate lines here and below

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -339,12 +366,24 @@ public void 
shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() {
         store.range(bytesKey("a"), bytesKey("b"));
     }
 
+    @Test(expected = InvalidStateStoreException.class)

Review comment:
       Use `assertThrows` -- we've been (slowly) migrating away from 
`@Test(expected)` in the Streams tests

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -193,22 +193,26 @@ public void prepareBatch(final List<KeyValue<Bytes, 
byte[]>> entries,
 
         @Override
         public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
-                                                     final Bytes to) {
+                                                     final Bytes to,
+                                                     final boolean reverse) {
             return new RocksDBDualCFRangeIterator(
                 name,
                 db.newIterator(newColumnFamily),
                 db.newIterator(oldColumnFamily),
                 from,
-                to);
+                to,
+                reverse);
         }
 
         @Override
-        public KeyValueIterator<Bytes, byte[]> all() {
+        public KeyValueIterator<Bytes, byte[]> all(final boolean reverse) {
             final RocksIterator innerIterWithTimestamp = 
db.newIterator(newColumnFamily);
-            innerIterWithTimestamp.seekToFirst();
+            if (reverse) innerIterWithTimestamp.seekToLast();

Review comment:
       braces & separate lines

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
##########
@@ -150,6 +161,17 @@ public void 
shouldThrowUnsupportedOperationExceptionWhileRemove() {
         } catch (final UnsupportedOperationException e) { }
     }
 
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionWhileReverseRange() {
+        stubOneUnderlying.put("a", "1");
+        stubOneUnderlying.put("b", "1");
+        final KeyValueIterator<String, String> keyValueIterator = 
theStore.reverseRange("a", "b");
+        try {
+            keyValueIterator.remove();

Review comment:
       use `assertThrows` here as well

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
##########
@@ -199,11 +236,44 @@ public void shouldSupportRangeAcrossMultipleKVStores() {
         cache.put("x", "x");
 
         final List<KeyValue<String, String>> results = 
toList(theStore.range("a", "e"));
+        assertArrayEquals(
+            asList(
+                new KeyValue<>("a", "a"),
+                new KeyValue<>("b", "b"),
+                new KeyValue<>("c", "c"),
+                new KeyValue<>("d", "d")
+            ).toArray(),
+            results.toArray());
+    }
+
+    @Test
+    public void shouldSupportReverseRangeAcrossMultipleKVStores() {
+        final KeyValueStore<String, String> cache = newStoreInstance();
+        stubProviderTwo.addStore(storeName, cache);
+
+        stubOneUnderlying.put("a", "a");
+        stubOneUnderlying.put("b", "b");
+        stubOneUnderlying.put("z", "z");
+
+        cache.put("c", "c");
+        cache.put("d", "d");
+        cache.put("x", "x");
+
+        final List<KeyValue<String, String>> results = 
toList(theStore.reverseRange("a", "e"));
         assertTrue(results.contains(new KeyValue<>("a", "a")));
         assertTrue(results.contains(new KeyValue<>("b", "b")));
         assertTrue(results.contains(new KeyValue<>("c", "c")));
         assertTrue(results.contains(new KeyValue<>("d", "d")));
         assertEquals(4, results.size());
+        //FIXME: order does not hold between stores, how to validate order 
here?

Review comment:
       I think the best we can do is just make sure that order is correct 
within a store. ie if `a`, `m` are all in `stubOneUnderling` then make sure the 
reverse range returns `m` before `a`.
   
   I also think it would be fine to just make sure all the expected values are 
returned without checking the order, since there are other tests to verify that 
the order within a store is correct

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
##########
@@ -29,32 +29,41 @@
     // comparator to be pluggable, and the default is lexicographic, so it's
     // safe to just force lexicographic comparator here for now.
     private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-    private final byte[] rawToKey;
+    private final byte[] rawLastKey;
+    private final boolean reverse;
 
     RocksDBRangeIterator(final String storeName,
                          final RocksIterator iter,
                          final Set<KeyValueIterator<Bytes, byte[]>> 
openIterators,
                          final Bytes from,
-                         final Bytes to) {
-        super(storeName, iter, openIterators);
-        iter.seek(from.get());
-        rawToKey = to.get();
-        if (rawToKey == null) {
+                         final Bytes to,
+                         final boolean reverse) {
+        super(storeName, iter, openIterators, reverse);
+        this.reverse = reverse;
+        if (reverse) {
+            iter.seekForPrev(to.get());
+            rawLastKey = from.get();
+        } else {
+            iter.seek(from.get());
+            rawLastKey = to.get();
+        }
+        if (rawLastKey == null) {
             throw new NullPointerException("RocksDBRangeIterator: RawToKey is 
null for key " + to);

Review comment:
       Also it should be `from` for the reverse case, right? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to