showuon commented on a change in pull request #11211: URL: https://github.com/apache/kafka/pull/11211#discussion_r730618511
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java ########## @@ -239,9 +254,31 @@ public V fetchSession(final K key, final long earliestSessionEndTime, final long ); } + private long getObservedStreamTime(final StateStore stateStore) { + if (stateStore instanceof PersistentSessionStore) { + return ((PersistentSessionStore) stateStore).getObservedStreamTime(); + } else if (stateStore instanceof WrappedStateStore) { + return getObservedStreamTime(((WrappedStateStore) stateStore).wrapped()); + } else { + return ConsumerRecord.NO_TIMESTAMP; + } + } + @Override public KeyValueIterator<Windowed<K>, V> fetch(final K key) { Objects.requireNonNull(key, "key cannot be null"); + + if (wrapped().persistent()) { + final long actualEarliestSessionEndTime = getActualEarliestSessionEndTime(wrapped(), 0); + return new MeteredWindowedKeyValueIterator<>( + wrapped().findSessions( + keyBytes(key), actualEarliestSessionEndTime, Long.MAX_VALUE), + fetchSensor, + streamsMetrics, + serdes, + time); + } + return new MeteredWindowedKeyValueIterator<>( Review comment: I know in this issue, we only want to improve the non-inMemory window stores, but do you think we can make them return the `MeteredWindowedKeyValueIterator` with `actualEarliestSessionEndTime` ? ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java ########## @@ -239,9 +254,31 @@ public V fetchSession(final K key, final long earliestSessionEndTime, final long ); } + private long getObservedStreamTime(final StateStore stateStore) { + if (stateStore instanceof PersistentSessionStore) { + return ((PersistentSessionStore) stateStore).getObservedStreamTime(); + } else if (stateStore instanceof WrappedStateStore) { + return getObservedStreamTime(((WrappedStateStore) stateStore).wrapped()); + } else { + return ConsumerRecord.NO_TIMESTAMP; + } + } + @Override public KeyValueIterator<Windowed<K>, V> fetch(final K key) { Objects.requireNonNull(key, "key cannot be null"); + + if (wrapped().persistent()) { + final long actualEarliestSessionEndTime = getActualEarliestSessionEndTime(wrapped(), 0); + return new MeteredWindowedKeyValueIterator<>( + wrapped().findSessions( + keyBytes(key), actualEarliestSessionEndTime, Long.MAX_VALUE), + fetchSensor, + streamsMetrics, + serdes, + time); + } + return new MeteredWindowedKeyValueIterator<>( Review comment: And same comments to below similar places. ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java ########## @@ -60,7 +103,8 @@ public void shouldRemoveExpired() { try (final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE) ) { - assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L))); + assertEquals(valuesToSet(iterator), new HashSet<>(Collections.singletonList(4L))); Review comment: Thanks for the comments added. So, the comment in line 100 is not valid anymore, (or should be updated), right? // Advance stream time to expire the first record ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ########## @@ -201,6 +205,9 @@ public V fetch(final K key, Objects.requireNonNull(key, "key cannot be null"); return maybeMeasureLatency( () -> { + if (wrapped().persistent() && timestamp <= getObservedStreamTime(wrapped()) - retentionPeriod) { + return null; Review comment: I think `wrapped().persistent()` can be removed. This logic should apply to whole window stores, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org