ableegoldman commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r476969847



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
##########
@@ -104,7 +121,47 @@ public V fetch(final K key, final long time) {
 
         return new KeyValueIterator<Windowed<K>, V>() {
             @Override
-            public void close() {}
+            public void close() {
+            }
+
+            @Override
+            public Windowed<K> peekNextKey() {
+                throw new UnsupportedOperationException("peekNextKey() not 
supported in " + getClass().getName());
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public KeyValue<Windowed<K>, V> next() {
+                return iterator.next();
+            }
+
+        };
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardAll() {
+        if (!open) {
+            throw new InvalidStateStoreException("Store is not open");
+        }
+        final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
+        for (final long now : data.keySet()) {

Review comment:
       Same here (and all the backwards ReadOnlyWindowStoreStub methods): I 
think we are kind of forced to invert the key ordering for the backwards fetch 
methods as well, even if we don't necessarily want to. Probably users shouldn't 
be relying on a strict ordering of the keys anyway but we do have to match the 
ordering of the cache

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -201,18 +203,48 @@ public synchronized void put(final Bytes key,
         }
 
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
wrapped().persistent() ?
-            new CacheIteratorWrapper(key, timeFrom, timeTo) :
-            context.cache().range(cacheName,
-                        
cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom)),
-                        
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo))
+            new CacheIteratorWrapper(key, timeFrom, timeTo, true) :
+            context.cache().range(
+                cacheName,
+                cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, 
timeFrom)),
+                cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, 
timeTo))
             );
 
         final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(key, key, timeFrom, timeTo);
-        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator = new FilteredCacheIterator(
-            cacheIterator, hasNextCondition, cacheFunction
-        );
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator =
+            new FilteredCacheIterator(cacheIterator, hasNextCondition, 
cacheFunction);
 
-        return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, 
underlyingIterator);
+        return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, 
underlyingIterator, true);
+    }
+
+    @Override
+    public synchronized WindowStoreIterator<byte[]> backwardFetch(final Bytes 
key,
+                                                                  final 
Instant from,
+                                                                  final 
Instant to) {

Review comment:
       I guess we should use the long signature here too, and do the conversion 
from Instant to long in a default implementation on the WindowStore interface?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -416,26 +552,43 @@ private long currentSegmentLastTime() {
         }
 
         private void getNextSegmentIterator() {
-            ++currentSegmentId;
-            lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+            if (forward) {
+                ++currentSegmentId;
+                lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
 
-            if (currentSegmentId > lastSegmentId) {
-                current = null;
-                return;
-            }
+                if (currentSegmentId > lastSegmentId) {
+                    current = null;
+                    return;
+                }
 
-            setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
+                setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
 
-            current.close();
-            current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+                current.close();
+
+                current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+            } else {
+                --currentSegmentId;
+//                lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));

Review comment:
       Guessing this is not actually meant to be commented out 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##########
@@ -287,17 +308,43 @@ public void shouldFetchAllInTimeRange() {
         final KeyValue<Windowed<Integer>, String> four = windowedPair(4, 
"four", startTime + 4);
         final KeyValue<Windowed<Integer>, String> five = windowedPair(5, 
"five", startTime + 5);
 
-        assertEquals(
-            new HashSet<>(asList(one, two, four)),
-            toSet(windowStore.fetchAll(ofEpochMilli(startTime + 1), 
ofEpochMilli(startTime + 4)))
+        assertArrayEquals(
+            new LinkedHashSet<>(asList(one, two, four)).toArray(),
+            toSet(windowStore.fetchAll(ofEpochMilli(startTime + 1), 
ofEpochMilli(startTime + 4))).toArray()
         );
-        assertEquals(
-            new HashSet<>(asList(zero, one, two)),
-            toSet(windowStore.fetchAll(ofEpochMilli(startTime + 0), 
ofEpochMilli(startTime + 3)))
+        assertArrayEquals(
+            new LinkedHashSet<>(asList(zero, one, two)).toArray(),
+            toSet(windowStore.fetchAll(ofEpochMilli(startTime + 0), 
ofEpochMilli(startTime + 3))).toArray()
         );
-        assertEquals(
-            new HashSet<>(asList(one, two, four, five)),
-            toSet(windowStore.fetchAll(ofEpochMilli(startTime + 1), 
ofEpochMilli(startTime + 5)))
+        assertArrayEquals(
+            new LinkedHashSet<>(asList(one, two, four, five)).toArray(),

Review comment:
       It feels kind of ridiculous to convert this from a list to a set to an 
array all in one line. Maybe we can use `assertThat(result, 
equalTo(expectedResult))` here like we've started to do elsewhere in Streams?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to