vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r702070313



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -292,13 +408,46 @@ public V fetch(final K key,
             time);
     }
 
+    private long getActualWindowStartTime(final long timeFrom) {
+        return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+    }
+
+    private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final 
boolean forward) {
+        final KeyValueIterator<Windowed<Bytes>, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+        final long observedStreamTime = ((PersistentWindowStore<Bytes, 
byte[]>) wrapped()).getObservedStreamTime();
+        if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+            return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+        final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+        final List<KeyValue<Windowed<Bytes>, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+        while (allWindowedKeyValueIterator.hasNext()) {
+            final KeyValue<Windowed<Bytes>, byte[]> next = 
allWindowedKeyValueIterator.next();
+            if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+                continue;
+            }
+            windowedKeyValuesInBoundary.add(next);
+        }
+        return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+    }

Review comment:
       @mjsax i have a question here... In the jira ticket, you. have mentioned 
that the best place for adding this filtering is in the MeteredStore as that 
implicitly adds the logic even for custom state stores. While for the most 
part, this kind of filtering has worked fine(fetching relevant records and then 
filtering in MeteredStore) but there's a case where it's failing. It's for 
test. cases like `shouldNotThrowConcurrentModificationException` . This seems 
to be because the put() call while iterating is appending to the wrapped 
instance of iterator and hence it's not visible.
   
   Looking at this, do you think it would be a good idea to move this logic in 
the actual RocksDB implementations? Or do you think there's a better way to do 
it here in MeteredStore class itself?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to