ableegoldman commented on a change in pull request #9137: URL: https://github.com/apache/kafka/pull/9137#discussion_r468843465
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java ########## @@ -58,7 +61,8 @@ public synchronized boolean hasNext() { return allDone(); } else { next = getKeyValue(); - iter.next(); + if (reverse) iter.prev(); Review comment: separate lines 🙂 ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java ########## @@ -29,32 +29,41 @@ // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR; - private final byte[] rawToKey; + private final byte[] rawLastKey; + private final boolean reverse; RocksDBRangeIterator(final String storeName, final RocksIterator iter, final Set<KeyValueIterator<Bytes, byte[]>> openIterators, final Bytes from, - final Bytes to) { - super(storeName, iter, openIterators); - iter.seek(from.get()); - rawToKey = to.get(); - if (rawToKey == null) { + final Bytes to, + final boolean reverse) { + super(storeName, iter, openIterators, reverse); + this.reverse = reverse; + if (reverse) { + iter.seekForPrev(to.get()); + rawLastKey = from.get(); + } else { + iter.seek(from.get()); + rawLastKey = to.get(); + } + if (rawLastKey == null) { throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to); Review comment: nit: `RawToKey` --> `RawLastKey` ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java ########## @@ -294,13 +305,27 @@ public void shouldIterateOverRange() { assertEquals(items, results.size()); } + @Test + public void shouldReverseIterateOverRange() { + final int items = addItemsToCache(); + final KeyValueIterator<Bytes, byte[]> range = + store.reverseRange(bytesKey(String.valueOf(0)), bytesKey(String.valueOf(items))); + final List<Bytes> results = new ArrayList<>(); + while (range.hasNext()) { + results.add(range.next().key); + } + assertEquals(items, results.size()); Review comment: Can we add some tests that verify the actual contents + order of the reverse range? ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java ########## @@ -150,8 +166,9 @@ public void close() { private class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> { private final Iterator<Bytes> iter; - private InMemoryKeyValueIterator(final Set<Bytes> keySet) { - this.iter = new TreeSet<>(keySet).iterator(); + private InMemoryKeyValueIterator(final Set<Bytes> keySet, final boolean reverse) { + if (reverse) this.iter = new TreeSet<>(keySet).descendingIterator(); + else this.iter = new TreeSet<>(keySet).iterator(); Review comment: nit: Use braces & separate lines ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java ########## @@ -110,7 +111,15 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { @Override public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) { + return range(from, to, false); + } + + @Override + public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from, final Bytes to) { + return range(from, to, true); + } + KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to, final boolean reverse) { Review comment: Should be private ########## File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java ########## @@ -38,35 +38,68 @@ * * @param key The key to fetch * @return The value or null if no value is found. - * @throws NullPointerException If null is used for key. + * @throws NullPointerException If null is used for key. * @throws InvalidStateStoreException if the store is not initialized */ V get(K key); /** * Get an iterator over a given range of keys. This iterator must be closed after use. * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s - * and must not return null values. No ordering guarantees are provided. - * @param from The first key that could be in the range - * @param to The last key that could be in the range - * @return The iterator for this range. - * @throws NullPointerException If null is used for from or to. + * and must not return null values. + * Order is not guaranteed as bytes lexicographical ordering might not represent key order. + * + * @param from The first key that could be in the range, where iteration starts from. + * @param to The last key that could be in the range, where iteration ends. + * @return The iterator for this range, from smallest to largest bytes. + * @throws NullPointerException If null is used for from or to. * @throws InvalidStateStoreException if the store is not initialized */ KeyValueIterator<K, V> range(K from, K to); + /** + * Get a reverse iterator over a given range of keys. This iterator must be closed after use. + * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s + * and must not return null values. + * Order is not guaranteed as bytes lexicographical ordering might not represent key order. + * + * @param from The first key that could be in the range, where iteration ends. + * @param to The last key that could be in the range, where iteration starts from. Review comment: Seems a bit tricky to say that _to_ is the variable where iteration starts _from_ 😉 But I can see it both ways, so being clear in the javadocs is good enough for me ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java ########## @@ -281,15 +281,24 @@ public boolean isEmpty() { } synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) { - return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true)); + return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), false); } - private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) { - return new TreeSet<>(keySet).iterator(); + synchronized Iterator<Bytes> reverseKeyRange(final Bytes from, final Bytes to) { + return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), true); + } + + private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet, final boolean reverse) { + if (reverse) return new TreeSet<>(keySet).descendingIterator(); Review comment: nit: braces + separate lines ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java ########## @@ -29,32 +29,41 @@ // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR; - private final byte[] rawToKey; + private final byte[] rawLastKey; + private final boolean reverse; RocksDBRangeIterator(final String storeName, final RocksIterator iter, final Set<KeyValueIterator<Bytes, byte[]>> openIterators, final Bytes from, - final Bytes to) { - super(storeName, iter, openIterators); - iter.seek(from.get()); - rawToKey = to.get(); - if (rawToKey == null) { + final Bytes to, + final boolean reverse) { + super(storeName, iter, openIterators, reverse); + this.reverse = reverse; + if (reverse) { + iter.seekForPrev(to.get()); + rawLastKey = from.get(); + } else { + iter.seek(from.get()); + rawLastKey = to.get(); + } + if (rawLastKey == null) { throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to); } } @Override public KeyValue<Bytes, byte[]> makeNext() { final KeyValue<Bytes, byte[]> next = super.makeNext(); - if (next == null) { return allDone(); } else { - if (comparator.compare(next.key.get(), rawToKey) <= 0) { - return next; + if (!reverse) { + if (comparator.compare(next.key.get(), rawLastKey) <= 0) return next; + else return allDone(); Review comment: nit: braces & separate lines ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java ########## @@ -422,6 +503,21 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { " Note that the built-in numerical serdes do not follow this for negative numbers") ); } + } + + @Test + public void shouldNotThrowInvalidReverseRangeExceptionWithNegativeFromKey() { Review comment: Can we add tests for some other invalid range cases? For example with both bounds positive but from > to ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java ########## @@ -306,26 +313,29 @@ public synchronized boolean hasNext() { } else { next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value()); nextWithTimestamp = null; - iterWithTimestamp.next(); + if (reverse) iterWithTimestamp.prev(); Review comment: braces & separate lines here and below ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java ########## @@ -339,12 +366,24 @@ public void shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() { store.range(bytesKey("a"), bytesKey("b")); } + @Test(expected = InvalidStateStoreException.class) Review comment: Use `assertThrows` -- we've been (slowly) migrating away from `@Test(expected)` in the Streams tests ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java ########## @@ -193,22 +193,26 @@ public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries, @Override public KeyValueIterator<Bytes, byte[]> range(final Bytes from, - final Bytes to) { + final Bytes to, + final boolean reverse) { return new RocksDBDualCFRangeIterator( name, db.newIterator(newColumnFamily), db.newIterator(oldColumnFamily), from, - to); + to, + reverse); } @Override - public KeyValueIterator<Bytes, byte[]> all() { + public KeyValueIterator<Bytes, byte[]> all(final boolean reverse) { final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily); - innerIterWithTimestamp.seekToFirst(); + if (reverse) innerIterWithTimestamp.seekToLast(); Review comment: braces & separate lines ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java ########## @@ -150,6 +161,17 @@ public void shouldThrowUnsupportedOperationExceptionWhileRemove() { } catch (final UnsupportedOperationException e) { } } + @Test + public void shouldThrowUnsupportedOperationExceptionWhileReverseRange() { + stubOneUnderlying.put("a", "1"); + stubOneUnderlying.put("b", "1"); + final KeyValueIterator<String, String> keyValueIterator = theStore.reverseRange("a", "b"); + try { + keyValueIterator.remove(); Review comment: use `assertThrows` here as well ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java ########## @@ -199,11 +236,44 @@ public void shouldSupportRangeAcrossMultipleKVStores() { cache.put("x", "x"); final List<KeyValue<String, String>> results = toList(theStore.range("a", "e")); + assertArrayEquals( + asList( + new KeyValue<>("a", "a"), + new KeyValue<>("b", "b"), + new KeyValue<>("c", "c"), + new KeyValue<>("d", "d") + ).toArray(), + results.toArray()); + } + + @Test + public void shouldSupportReverseRangeAcrossMultipleKVStores() { + final KeyValueStore<String, String> cache = newStoreInstance(); + stubProviderTwo.addStore(storeName, cache); + + stubOneUnderlying.put("a", "a"); + stubOneUnderlying.put("b", "b"); + stubOneUnderlying.put("z", "z"); + + cache.put("c", "c"); + cache.put("d", "d"); + cache.put("x", "x"); + + final List<KeyValue<String, String>> results = toList(theStore.reverseRange("a", "e")); assertTrue(results.contains(new KeyValue<>("a", "a"))); assertTrue(results.contains(new KeyValue<>("b", "b"))); assertTrue(results.contains(new KeyValue<>("c", "c"))); assertTrue(results.contains(new KeyValue<>("d", "d"))); assertEquals(4, results.size()); + //FIXME: order does not hold between stores, how to validate order here? Review comment: I think the best we can do is just make sure that order is correct within a store. ie if `a`, `m` are all in `stubOneUnderling` then make sure the reverse range returns `m` before `a`. I also think it would be fine to just make sure all the expected values are returned without checking the order, since there are other tests to verify that the order within a store is correct ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java ########## @@ -29,32 +29,41 @@ // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR; - private final byte[] rawToKey; + private final byte[] rawLastKey; + private final boolean reverse; RocksDBRangeIterator(final String storeName, final RocksIterator iter, final Set<KeyValueIterator<Bytes, byte[]>> openIterators, final Bytes from, - final Bytes to) { - super(storeName, iter, openIterators); - iter.seek(from.get()); - rawToKey = to.get(); - if (rawToKey == null) { + final Bytes to, + final boolean reverse) { + super(storeName, iter, openIterators, reverse); + this.reverse = reverse; + if (reverse) { + iter.seekForPrev(to.get()); + rawLastKey = from.get(); + } else { + iter.seek(from.get()); + rawLastKey = to.get(); + } + if (rawLastKey == null) { throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to); Review comment: Also it should be `from` for the reverse case, right? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org