jeqo commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r475910689



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
##########
@@ -119,15 +118,16 @@
      * <p>
      * This iterator must be closed after use.
      *
-     * @param from      the first key in the range
-     * @param to        the last key in the range
-     * @param timeFrom  time range start (inclusive)
-     * @param timeTo    time range end (inclusive)
+     * @param from     the first key in the range
+     * @param to       the last key in the range
+     * @param timeFrom time range start (inclusive)
+     * @param timeTo   time range end (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, 
value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException if one of the given keys is {@code null}
+     * @throws NullPointerException       if one of the given keys is {@code 
null}
      */
-    @SuppressWarnings("deprecation") // note, this method must be kept if 
super#fetch(...) is removed
+    // note, this method must be kept if super#fetch(...) is removed
+    @SuppressWarnings("deprecation")
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long 
timeTo);

Review comment:
       These methods were introduced when adding Duration/Instant support 
https://github.com/apache/kafka/pull/5682.
   
   I don't think these are needed, we can do a similar change as for 
SessionStore read operations. wdyt?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -419,13 +504,13 @@ Long minTime() {
         }

Review comment:
       For windowStore, only time-based index is been iterated backward. The 
KIP didn't considered reversing key/value stores internally. 
   
   We would need another flag (apart from backward) to define order of internal 
keys, which its cumbersome, and the order between keys doesn't matter much or 
can be calculated by the user. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -426,7 +558,12 @@ private void getNextSegmentIterator() {
             setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());

Review comment:
       Will have to double check this. I have inverted the current/last segment 
for backwards use-case though.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -72,22 +86,40 @@
             searchSpace.iterator(),

Review comment:
       `searchSpace` will be reversed based on the `forward` flag, on 
`AbstractSegments`. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final 
long windowStartTimes
     @Deprecated
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key, final long 
timeFrom, final long timeTo) {
+        return fetch(key, timeFrom, timeTo, true);
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final 
Instant from, final Instant to) {
+        final long timeFrom = ApiUtils.validateMillisecondInstant(from, 
prepareMillisCheckFailMsgPrefix(from, "from"));
+        final long timeTo = ApiUtils.validateMillisecondInstant(to, 
prepareMillisCheckFailMsgPrefix(to, "to"));

Review comment:
       Only backward compatibility. If it make sense to remove these 
deprecations as part of this KIP, I'd be happy to help cleaning it. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -337,25 +462,32 @@ public synchronized void close() {
 
         private CacheIteratorWrapper(final Bytes key,
                                      final long timeFrom,
-                                     final long timeTo) {
-            this(key, key, timeFrom, timeTo);
+                                     final long timeTo,
+                                     final boolean forward) {
+            this(key, key, timeFrom, timeTo, forward);
         }
 
         private CacheIteratorWrapper(final Bytes keyFrom,
                                      final Bytes keyTo,
                                      final long timeFrom,
-                                     final long timeTo) {
+                                     final long timeTo,
+                                     final boolean forward) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             this.timeTo = timeTo;
             this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+            this.forward = forward;
 
             this.segmentInterval = cacheFunction.getSegmentInterval();
             this.currentSegmentId = cacheFunction.segmentId(timeFrom);

Review comment:
       great catch! I think this hasn't pop up yet in the tests as all tests 
may be using the same segment.
   
   Will double check to add more tests to validate this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to