showuon commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r730618511



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -239,9 +254,31 @@ public V fetchSession(final K key, final long 
earliestSessionEndTime, final long
         );
     }
 
+    private long getObservedStreamTime(final StateStore stateStore) {
+        if (stateStore instanceof PersistentSessionStore) {
+            return ((PersistentSessionStore) 
stateStore).getObservedStreamTime();
+        } else if (stateStore instanceof WrappedStateStore) {
+            return getObservedStreamTime(((WrappedStateStore) 
stateStore).wrapped());
+        } else {
+            return ConsumerRecord.NO_TIMESTAMP;
+        }
+    }
+
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
+
+        if (wrapped().persistent()) {
+            final long actualEarliestSessionEndTime = 
getActualEarliestSessionEndTime(wrapped(), 0);
+            return new MeteredWindowedKeyValueIterator<>(
+                    wrapped().findSessions(
+                            keyBytes(key), actualEarliestSessionEndTime, 
Long.MAX_VALUE),
+                    fetchSensor,
+                    streamsMetrics,
+                    serdes,
+                    time);
+        }
+
         return new MeteredWindowedKeyValueIterator<>(

Review comment:
       I know in this issue, we only want to improve the non-inMemory window 
stores, but do you think we can make them return the 
`MeteredWindowedKeyValueIterator` with `actualEarliestSessionEndTime` ? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -239,9 +254,31 @@ public V fetchSession(final K key, final long 
earliestSessionEndTime, final long
         );
     }
 
+    private long getObservedStreamTime(final StateStore stateStore) {
+        if (stateStore instanceof PersistentSessionStore) {
+            return ((PersistentSessionStore) 
stateStore).getObservedStreamTime();
+        } else if (stateStore instanceof WrappedStateStore) {
+            return getObservedStreamTime(((WrappedStateStore) 
stateStore).wrapped());
+        } else {
+            return ConsumerRecord.NO_TIMESTAMP;
+        }
+    }
+
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
+
+        if (wrapped().persistent()) {
+            final long actualEarliestSessionEndTime = 
getActualEarliestSessionEndTime(wrapped(), 0);
+            return new MeteredWindowedKeyValueIterator<>(
+                    wrapped().findSessions(
+                            keyBytes(key), actualEarliestSessionEndTime, 
Long.MAX_VALUE),
+                    fetchSensor,
+                    streamsMetrics,
+                    serdes,
+                    time);
+        }
+
         return new MeteredWindowedKeyValueIterator<>(

Review comment:
       And same comments to below similar places.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
##########
@@ -60,7 +103,8 @@ public void shouldRemoveExpired() {
         try (final KeyValueIterator<Windowed<String>, Long> iterator =
             sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
         ) {
-            assertEquals(valuesToSet(iterator), new 
HashSet<>(Arrays.asList(2L, 3L, 4L)));
+            assertEquals(valuesToSet(iterator), new 
HashSet<>(Collections.singletonList(4L)));

Review comment:
       Thanks for the comments added. 
   So, the comment in line 100 is not valid anymore, (or should be updated), 
right?
   // Advance stream time to expire the first record

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -201,6 +205,9 @@ public V fetch(final K key,
         Objects.requireNonNull(key, "key cannot be null");
         return maybeMeasureLatency(
             () -> {
+                if (wrapped().persistent() && timestamp <= 
getObservedStreamTime(wrapped()) - retentionPeriod) {
+                    return null;

Review comment:
       I think `wrapped().persistent()` can be removed. This logic should apply 
to whole window stores, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to