ableegoldman commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r470311062



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##########
@@ -29,7 +29,7 @@
                           final RocksIterator newIterator,
                           final Set<KeyValueIterator<Bytes, byte[]>> 
openIterators,
                           final Bytes prefix) {
-        super(name, newIterator, openIterators);
+        super(name, newIterator, openIterators, false);

Review comment:
       This class is still unused so maybe we should just take it out. 
Someone's working on a prefix seek KIP at the moment so it'll be replaced soon 
anyway

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -374,7 +412,7 @@ public Bytes peekNextKey() {
             if (next == null) {
                 return allDone();
             } else {
-                if (comparator.compare(next.key.get(), upperBoundKey) <= 0) {
+                if (comparator.compare(next.key.get(), lastKey) <= 0) {

Review comment:
       We need to branch on `reverse` here too, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -306,26 +319,41 @@ public synchronized boolean hasNext() {
                 } else {
                     next = KeyValue.pair(new Bytes(nextWithTimestamp), 
iterWithTimestamp.value());
                     nextWithTimestamp = null;
-                    iterWithTimestamp.next();
+                    if (reverse) {
+                        iterWithTimestamp.prev();
+                    } else {
+                        iterWithTimestamp.next();
+                    }
                 }
             } else {
                 if (nextWithTimestamp == null) {
                     next = KeyValue.pair(new Bytes(nextNoTimestamp), 
convertToTimestampedFormat(iterNoTimestamp.value()));
                     nextNoTimestamp = null;
-                    iterNoTimestamp.next();
+                    if (reverse) {
+                        iterNoTimestamp.prev();
+                    } else {
+                        iterNoTimestamp.next();
+                    }
                 } else {
                     if (comparator.compare(nextNoTimestamp, nextWithTimestamp) 
<= 0) {

Review comment:
       If `compare(noTimestamp, withTimestamp)` <= 0 then `withTimestamp >= 
noTimestamp`, so for the reverse case we would actually want to return 
`withTimestamp` next here (and vice versa in the else block)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
##########
@@ -58,7 +61,11 @@ public synchronized boolean hasNext() {
             return allDone();
         } else {
             next = getKeyValue();
-            iter.next();
+            if (reverse) {

Review comment:
       Instead of storing and checking the `reverse` flag on every iteration, 
can we define something like
   ```
   java.util.function.Consumer<RocksIterator> advanceIterator = reverse ? 
RocksIterator::prev : RocksIterator::next;
   ```
   so then we can just blindly call `advanceIterator.accept(iter)`  

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
##########
@@ -188,7 +188,55 @@ public void testPutGetRange() {
     }
 
     @Test
-    public void testPutGetRangeWithDefaultSerdes() {
+    public void testPutGetReverseRange() {

Review comment:
       I think we should add some tests to `RocksDBTimestampedStoreTest`. I 
thought it would extend `AbstractKeyValueStoreTest` and thus benefit from 
everything you added here, but doesn't seem to be the case :/ 
   Personally I found the `RocksDBDualCFIterator` logic a bit difficult to 
follow even before the reverse iteration,  so it would be nice to have some 
tests specifically covering reverse iterators over multi-column-family 
timestamped stores

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -321,67 +368,94 @@ public void shouldClearNamespaceCacheOnClose() {
         assertEquals(0, cache.size());
     }
 
-    @Test(expected = InvalidStateStoreException.class)

Review comment:
       Awesome, thank you for cleaning up the whole class 🙏 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
##########
@@ -374,7 +412,7 @@ public Bytes peekNextKey() {
             if (next == null) {
                 return allDone();
             } else {
-                if (comparator.compare(next.key.get(), upperBoundKey) <= 0) {
+                if (comparator.compare(next.key.get(), lastKey) <= 0) {

Review comment:
       Kind of unrelated, but WDYT about renaming `RocksDBDualCFIterator` to 
`RocksDBDualCFAllIterator` or something on the side? I feel like these 
iterators could be cleaned up a bit in general to be more understandable -- for 
example, it's weird that we do the `iterator#seek`-ing in the actual `all()` 
method but for range queries we do the seeking inside the iterator constructor. 
   Just thinking out loud though, we can do some followup refactoring once this 
is merged




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to