showuon commented on code in PR #11211: URL: https://github.com/apache/kafka/pull/11211#discussion_r852555527
########## streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java: ########## @@ -234,7 +275,11 @@ public void put(final Bytes key, @Override public byte[] get(final Bytes key) { - final S segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key)); + final long timestampFromKey = keySchema.segmentTimestamp(key); + // check if timestamp is expired + if (timestampFromKey < observedStreamTime - retentionPeriod + 1) + return null; Review Comment: We might be able to print a debug log here to mention the value is expired so won't return. Same as others. ########## streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java: ########## @@ -157,43 +157,34 @@ public void shouldPutAndFetch() { try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) { - final List<KeyValue<Windowed<String>, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L) - ); + // all records expired as actual from is 59001 and to is 1000 Review Comment: Could you make it clear that where the 59001 come from? (I think it's observed time 60000 - retention 1000 + 1, but please make it clear in comment). Although we can understand the emptyIterator is because records are expired, we are unclear where the 59001 come from. Thanks. ########## streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java: ########## @@ -157,43 +157,34 @@ public void shouldPutAndFetch() { try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) { - final List<KeyValue<Windowed<String>, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L) - ); + // all records expired as actual from is 59001 and to is 1000 + final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList(); assertEquals(expected, toList(values)); } try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch( Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { - final List<KeyValue<Windowed<String>, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) - ); + // all records expired as actual from is 59001 and to is 1000 + final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList(); Review Comment: Thanks for making the tests work as expected. But I'm afraid it'll change the original test goal, and might not catch the error when things go wrong. Could we have 2 tests, one with large retention period, so that we can keep the original tests, and the other one is what you modified here? Same comments to other tests. Thanks. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java: ########## @@ -87,23 +90,35 @@ public KeyValueIterator<Bytes, byte[]> backwardFetch(final Bytes key, return fetch(key, from, to, false); } + KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to, final boolean forward) { - final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); - final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from); + final long actualFrom = getActualFrom(from); + + if (keySchema instanceof WindowKeySchema && to < actualFrom) { + return KeyValueIterators.emptyIterator(); Review Comment: Could you help me understand why we only return `emptyIterator` when it's `WindowKeySchema`? Why not other types of schema? ########## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java: ########## @@ -181,15 +189,16 @@ public void testRolling() { ), segmentDirs(baseDir) ); - + // expired record assertEquals( - new HashSet<>(Collections.singletonList("zero")), + new HashSet<>(Collections.emptyList()), Review Comment: Could we make the retention period longer to keep the original tests? I'm afraid this change will not catch the expected error when code changes unexpectedly. I'm thinking we can have 2 tests, one is the long retention period, and the other one is current setting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org