vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r706578593
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -292,13 +408,46 @@ public V fetch(final K key,
time);
}
+ private long getActualWindowStartTime(final long timeFrom) {
+ return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>)
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+ }
+
+ private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final
boolean forward) {
+ final KeyValueIterator<Windowed<Bytes>, byte[]>
allWindowedKeyValueIterator = forward ? wrapped().all() :
wrapped().backwardAll();
+
+ final long observedStreamTime = ((PersistentWindowStore<Bytes,
byte[]>) wrapped()).getObservedStreamTime();
+ if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime ==
ConsumerRecord.NO_TIMESTAMP)
+ return new
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor,
streamsMetrics, serdes, time);
+
+ final long windowStartBoundary = observedStreamTime - retentionPeriod
+ 1;
+ final List<KeyValue<Windowed<Bytes>, byte[]>>
windowedKeyValuesInBoundary = new ArrayList<>();
+
+ while (allWindowedKeyValueIterator.hasNext()) {
+ final KeyValue<Windowed<Bytes>, byte[]> next =
allWindowedKeyValueIterator.next();
+ if
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
{
+ continue;
+ }
+ windowedKeyValuesInBoundary.add(next);
+ }
+ return new MeteredWindowedKeyValueIterator<>(new
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor,
streamsMetrics, serdes, time);
+ }
Review comment:
@ableegoldman I agree that having the filtering logic in the Metering
layer is the most sensible place to keep it as that would provide the
flexibility to add this filtering logic for any store.
Let me explain the scenario with
`shouldNotThrowConcurrentModificationException`:
If you see the test case in AbstractWindowBytesStoreTest:
```
try (final KeyValueIterator<Windowed<Integer>, String> iterator =
windowStore.all()) {
currentTime += WINDOW_SIZE * 10;
windowStore.put(1, "three", currentTime);
currentTime += WINDOW_SIZE * 10;
windowStore.put(2, "four", currentTime);
// Iterator should return all records in store and not throw
exception b/c some were added after fetch
assertEquals(windowedPair(1, "one", 0), iterator.next());
assertEquals(windowedPair(1, "two", WINDOW_SIZE * 10),
iterator.next());
assertEquals(windowedPair(1, "three", WINDOW_SIZE * 20),
iterator.next());
assertEquals(windowedPair(2, "four", WINDOW_SIZE * 30),
iterator.next());
assertFalse(iterator.hasNext());
}
```
after fetching all the keys present in the store using `all()`, during
iteration, 2 new keys are being `put`. With the current implementation in
trunk, it works fine but with the filtering logic that I have added for all()
in `filterExpiredRecords` :
```
private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final boolean
forward) {
final KeyValueIterator<Windowed<Bytes>, byte[]>
allWindowedKeyValueIterator = forward ? wrapped().all() :
wrapped().backwardAll();
final long observedStreamTime = getObservedStreamTime(wrapped());
if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime ==
ConsumerRecord.NO_TIMESTAMP)
return new
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor,
streamsMetrics, serdes, time);
final long windowStartBoundary = observedStreamTime -
retentionPeriod + 1;
final List<KeyValue<Windowed<Bytes>, byte[]>> expiredRecords = new
ArrayList<>();
while (allWindowedKeyValueIterator.hasNext()) {
final KeyValue<Windowed<Bytes>, byte[]> next =
allWindowedKeyValueIterator.next();
if
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
{
expiredRecords.add(next);
}
}
for (KeyValue<Windowed<Bytes>, byte[]> record: expiredRecords) {
wrapped().put(record.key.key(), null,
record.key.window().start());
}
return new MeteredWindowedKeyValueIterator<>(wrapped().all(),
fetchSensor, streamsMetrics, serdes, time);
// return new MeteredWindowedKeyValueIterator<>(new
WindowedKeyValueIterator(expiredRecords.iterator()), fetchSensor,
streamsMetrics, serdes, time);
}
```
first `all` the keys are fetched and then the filtering happens afterwards.
At this point, if you notice, I am essentially. creating a new iterator which
is what being returned. Now, in the failing test case, when `put` inserts 2
more keys during iteration, that happens to the original iterator in
`wrapped()` and not to the iterator being returned. And that's the reason, the
test case fails on
`assertEquals(windowedPair(1, "three", WINDOW_SIZE * 20), iterator.next());`
as there are no more records. Just for testing, if I uncomment the last return
statement in favour of the one before it and run the test. case again, it works
fine.
So, either there's something wrong with the way I have implemented, where
any suggestions would be welcome or due to the edge case, it might be needed to
push the filtering down further to the underlying store. Let me know if it made
sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]