RainerMatthiasS commented on code in PR #23239: URL: https://github.com/apache/flink/pull/23239#discussion_r1306992807
########## flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java: ########## @@ -111,9 +128,4 @@ public void remove() { } } } - - @Override - public void close() throws Exception { Review Comment: Sure :smile: . Only that both implementations are unrelated and only happen to have the same name. (Should I move this down?) ########## flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java: ########## @@ -31,65 +31,82 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Stream; /** * An iterator for reading all keys in a state backend across multiple partitioned states. * * <p>To read unique keys across all partitioned states callers must invoke {@link * MultiStateKeyIterator#remove}. * + * <p>Note: This is a replacement of the original implementation which used streams with a known + * flaw in the {@link Stream#flatMap(java.util.function.Function)} implementation that lead to + * completely enumerating and buffering nested iterators event for a single call to {@link + * MultiStateKeyIterator#hasNext}. + * + * @see <a + * href="https://bugs.openjdk.org/browse/JDK-8267359">https://bugs.openjdk.org/browse/JDK-8267359</a> * @param <K> Type of the key by which state is keyed. */ @Internal -public final class MultiStateKeyIterator<K> implements CloseableIterator<K> { Review Comment: :+1: ########## flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java: ########## @@ -125,4 +226,117 @@ public void testIteratorRemovesFromAllDescriptors() throws Exception { .count()); } } + + /** Test for lazy enumeration of inner iterators. */ + @Test + public void testIteratorPullsSingleKeyFromAllDescriptors() throws AssertionError { Review Comment: @masteryhx Is there anything else I can do in order to progress this PR? ########## flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java: ########## @@ -125,4 +226,117 @@ public void testIteratorRemovesFromAllDescriptors() throws Exception { .count()); } } + + /** Test for lazy enumeration of inner iterators. */ + @Test + public void testIteratorPullsSingleKeyFromAllDescriptors() throws AssertionError { Review Comment: No, it actually tests the 'economy' of `hasNext()` and tests that a single call to `hasNext()` does **not** iterate the whole collection (which is the case for the original implementation by means of java streams (the [JDK-8267359](https://bugs.openjdk.org/browse/JDK-8267359) flaw)). I.e. this test is exactly as intended. To test completeness of enumeration is done in `testIteratorPullsKeyFromAllDescriptors()`. ########## flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java: ########## @@ -125,4 +185,117 @@ public void testIteratorRemovesFromAllDescriptors() throws Exception { .count()); } } + + /** Test for lazy enumeration of inner iterators. */ + @Test + public void testIteratorPullsSingleKeyFromAllDescriptors() throws AssertionError { Review Comment: The original implementation of `MultiStateKeyIterator` would iterate all 1e8 keys and store them in a `SpinedBuffer`, just for a single initial call to `hasNext()`. The test asserts that only a single key is touched. Technically, enumerating all keys is still a correct implementation but a wasteful one that might even lead to OOM errors. We only count how many keys are actually enumerated instead of observing the actual memory (GC) consumption, which would be too brittle as a unit test. ########## flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java: ########## @@ -125,4 +185,117 @@ public void testIteratorRemovesFromAllDescriptors() throws Exception { .count()); } } + + /** Test for lazy enumeration of inner iterators. */ + @Test + public void testIteratorPullsSingleKeyFromAllDescriptors() throws AssertionError { + CountingManyKeysKeyedStateBackend keyedStateBackend = + createManyKeysKeyedStateBackend(100_000_000); + MultiStateKeyIterator<Integer> testedIterator = + new MultiStateKeyIterator<>(descriptors, keyedStateBackend); + + boolean hasnext = testedIterator.hasNext(); + + Assert.assertEquals( + "Unexpected number of keys enumerated", + 1, + keyedStateBackend.numberOfKeysEnumerated); + } + + /** + * Mockup {@link AbstractKeyedStateBackend} that counts how many keys are enumerated. + * + * <p>Generates a configured nmber of integer keys, only method actually implemented is {@link + * CountingManyKeysKeyedStateBackend#getKeys(java.lang.String, java.lang.Object)} + */ + static class CountingManyKeysKeyedStateBackend extends AbstractKeyedStateBackend<Integer> { Review Comment: :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org