ableegoldman commented on a change in pull request #9138: URL: https://github.com/apache/kafka/pull/9138#discussion_r476969847
########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java ########## @@ -104,7 +121,47 @@ public V fetch(final K key, final long time) { return new KeyValueIterator<Windowed<K>, V>() { @Override - public void close() {} + public void close() { + } + + @Override + public Windowed<K> peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public KeyValue<Windowed<K>, V> next() { + return iterator.next(); + } + + }; + } + + @Override + public KeyValueIterator<Windowed<K>, V> backwardAll() { + if (!open) { + throw new InvalidStateStoreException("Store is not open"); + } + final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>(); + for (final long now : data.keySet()) { Review comment: Same here (and all the backwards ReadOnlyWindowStoreStub methods): I think we are kind of forced to invert the key ordering for the backwards fetch methods as well, even if we don't necessarily want to. Probably users shouldn't be relying on a strict ordering of the keys anyway but we do have to match the ordering of the cache ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ########## @@ -201,18 +203,48 @@ public synchronized void put(final Bytes key, } final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = wrapped().persistent() ? - new CacheIteratorWrapper(key, timeFrom, timeTo) : - context.cache().range(cacheName, - cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom)), - cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo)) + new CacheIteratorWrapper(key, timeFrom, timeTo, true) : + context.cache().range( + cacheName, + cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, timeFrom)), + cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, timeTo)) ); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(key, key, timeFrom, timeTo); - final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator( - cacheIterator, hasNextCondition, cacheFunction - ); + final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = + new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); - return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator); + return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator, true); + } + + @Override + public synchronized WindowStoreIterator<byte[]> backwardFetch(final Bytes key, + final Instant from, + final Instant to) { Review comment: I guess we should use the long signature here too, and do the conversion from Instant to long in a default implementation on the WindowStore interface? ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ########## @@ -416,26 +552,43 @@ private long currentSegmentLastTime() { } private void getNextSegmentIterator() { - ++currentSegmentId; - lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get())); + if (forward) { + ++currentSegmentId; + lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get())); - if (currentSegmentId > lastSegmentId) { - current = null; - return; - } + if (currentSegmentId > lastSegmentId) { + current = null; + return; + } - setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); - current.close(); - current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); + current.close(); + + current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); + } else { + --currentSegmentId; +// lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get())); Review comment: Guessing this is not actually meant to be commented out ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java ########## @@ -287,17 +308,43 @@ public void shouldFetchAllInTimeRange() { final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4); final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5); - assertEquals( - new HashSet<>(asList(one, two, four)), - toSet(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 4))) + assertArrayEquals( + new LinkedHashSet<>(asList(one, two, four)).toArray(), + toSet(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 4))).toArray() ); - assertEquals( - new HashSet<>(asList(zero, one, two)), - toSet(windowStore.fetchAll(ofEpochMilli(startTime + 0), ofEpochMilli(startTime + 3))) + assertArrayEquals( + new LinkedHashSet<>(asList(zero, one, two)).toArray(), + toSet(windowStore.fetchAll(ofEpochMilli(startTime + 0), ofEpochMilli(startTime + 3))).toArray() ); - assertEquals( - new HashSet<>(asList(one, two, four, five)), - toSet(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 5))) + assertArrayEquals( + new LinkedHashSet<>(asList(one, two, four, five)).toArray(), Review comment: It feels kind of ridiculous to convert this from a list to a set to an array all in one line. Maybe we can use `assertThat(result, equalTo(expectedResult))` here like we've started to do elsewhere in Streams? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org