vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r706578593



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -292,13 +408,46 @@ public V fetch(final K key,
             time);
     }
 
+    private long getActualWindowStartTime(final long timeFrom) {
+        return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+    }
+
+    private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final 
boolean forward) {
+        final KeyValueIterator<Windowed<Bytes>, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+        final long observedStreamTime = ((PersistentWindowStore<Bytes, 
byte[]>) wrapped()).getObservedStreamTime();
+        if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+            return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+        final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+        final List<KeyValue<Windowed<Bytes>, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+        while (allWindowedKeyValueIterator.hasNext()) {
+            final KeyValue<Windowed<Bytes>, byte[]> next = 
allWindowedKeyValueIterator.next();
+            if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+                continue;
+            }
+            windowedKeyValuesInBoundary.add(next);
+        }
+        return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+    }

Review comment:
       @ableegoldman I agree that having the filtering logic in the Metering 
layer is the most sensible place to keep it as that would provide the 
flexibility to add this filtering logic for any store. 
   Let me explain the scenario with 
`shouldNotThrowConcurrentModificationException`:
   
   If you see the test case in AbstractWindowBytesStoreTest:
   
   ```
   try (final KeyValueIterator<Windowed<Integer>, String> iterator = 
windowStore.all()) {
   
               currentTime += WINDOW_SIZE * 10;
               windowStore.put(1, "three", currentTime);
   
               currentTime += WINDOW_SIZE * 10;
               windowStore.put(2, "four", currentTime);
   
               // Iterator should return all records in store and not throw 
exception b/c some were added after fetch
               assertEquals(windowedPair(1, "one", 0), iterator.next());
               assertEquals(windowedPair(1, "two", WINDOW_SIZE * 10), 
iterator.next());
               assertEquals(windowedPair(1, "three", WINDOW_SIZE * 20), 
iterator.next());
               assertEquals(windowedPair(2, "four", WINDOW_SIZE * 30), 
iterator.next());
               assertFalse(iterator.hasNext());
           }
   ```
   
   after fetching all the keys present in the store using `all()`, during 
iteration, 2 new keys are being `put`. With the current implementation in 
trunk, it works fine but with the filtering logic that I have added for all() 
in `filterExpiredRecords` : 
   
   ```
   private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final boolean 
forward) {
           final KeyValueIterator<Windowed<Bytes>, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
           final long observedStreamTime = getObservedStreamTime(wrapped());
           if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
               return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
           final long windowStartBoundary = observedStreamTime - 
retentionPeriod + 1;
           final List<KeyValue<Windowed<Bytes>, byte[]>> expiredRecords = new 
ArrayList<>();
   
           while (allWindowedKeyValueIterator.hasNext()) {
               final KeyValue<Windowed<Bytes>, byte[]> next = 
allWindowedKeyValueIterator.next();
               if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
                   expiredRecords.add(next);
               }
           }
           for (KeyValue<Windowed<Bytes>, byte[]> record: expiredRecords) {
               wrapped().put(record.key.key(), null, 
record.key.window().start());
           }
           return new MeteredWindowedKeyValueIterator<>(wrapped().all(), 
fetchSensor, streamsMetrics, serdes, time);
   //        return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(expiredRecords.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
       }
   ```
   
   first `all` the keys are fetched and then the filtering happens afterwards. 
At this point, if you notice, I am essentially. creating a new iterator which 
is what being returned. Now, in the failing test case, when `put` inserts 2 
more keys during iteration, that happens to the original iterator in 
`wrapped()` and not to the iterator being returned. And that's the reason, the 
test case fails on
   
   `assertEquals(windowedPair(1, "three", WINDOW_SIZE * 20), iterator.next());` 
as there are no more records. Just for testing, if I uncomment the last return 
statement in favour of the one before it and run the test. case again, it works 
fine.
   
   So, either there's something wrong with the way I have implemented, where 
any suggestions would be welcome or due to the edge case, it might be needed to 
push the filtering down further to the underlying store. Let me know if it made 
sense.
   
   
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to