RainerMatthiasS commented on code in PR #23239:
URL: https://github.com/apache/flink/pull/23239#discussion_r1306992807


##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -111,9 +128,4 @@ public void remove() {
             }
         }
     }
-
-    @Override
-    public void close() throws Exception {

Review Comment:
   Sure :smile: . Only that both implementations are unrelated and only happen 
to have the same name.
   (Should I move this down?)
   
   
   



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -31,65 +31,82 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Stream;
 
 /**
  * An iterator for reading all keys in a state backend across multiple 
partitioned states.
  *
  * <p>To read unique keys across all partitioned states callers must invoke 
{@link
  * MultiStateKeyIterator#remove}.
  *
+ * <p>Note: This is a replacement of the original implementation which used 
streams with a known
+ * flaw in the {@link Stream#flatMap(java.util.function.Function)} 
implementation that lead to
+ * completely enumerating and buffering nested iterators event for a single 
call to {@link
+ * MultiStateKeyIterator#hasNext}.
+ *
+ * @see <a
+ *     
href="https://bugs.openjdk.org/browse/JDK-8267359";>https://bugs.openjdk.org/browse/JDK-8267359</a>
  * @param <K> Type of the key by which state is keyed.
  */
 @Internal
-public final class MultiStateKeyIterator<K> implements CloseableIterator<K> {

Review Comment:
   :+1:



##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +226,117 @@ public void testIteratorRemovesFromAllDescriptors() 
throws Exception {
                             .count());
         }
     }
+
+    /** Test for lazy enumeration of inner iterators. */
+    @Test
+    public void testIteratorPullsSingleKeyFromAllDescriptors() throws 
AssertionError {

Review Comment:
   @masteryhx Is there anything else I can do in order to progress this PR?



##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +226,117 @@ public void testIteratorRemovesFromAllDescriptors() 
throws Exception {
                             .count());
         }
     }
+
+    /** Test for lazy enumeration of inner iterators. */
+    @Test
+    public void testIteratorPullsSingleKeyFromAllDescriptors() throws 
AssertionError {

Review Comment:
   No, it actually tests the 'economy' of `hasNext()` and tests that a single 
call to `hasNext()` does **not** iterate the whole collection (which is the 
case for the original implementation by means of java streams (the 
[JDK-8267359](https://bugs.openjdk.org/browse/JDK-8267359) flaw)).
   I.e. this test is exactly as intended.
   To test completeness of enumeration is done in 
`testIteratorPullsKeyFromAllDescriptors()`.



##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +185,117 @@ public void testIteratorRemovesFromAllDescriptors() 
throws Exception {
                             .count());
         }
     }
+
+    /** Test for lazy enumeration of inner iterators. */
+    @Test
+    public void testIteratorPullsSingleKeyFromAllDescriptors() throws 
AssertionError {

Review Comment:
   The original implementation of `MultiStateKeyIterator` would iterate all 1e8 
keys and store them in a `SpinedBuffer`, just for a single initial call to 
`hasNext()`. The test asserts that only a single key is touched. Technically, 
enumerating all keys is still a correct implementation but a wasteful one that 
might even lead to OOM errors.
   We only count how many keys are actually enumerated instead of observing the 
actual memory (GC) consumption, which would be too brittle as a unit test.



##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +185,117 @@ public void testIteratorRemovesFromAllDescriptors() 
throws Exception {
                             .count());
         }
     }
+
+    /** Test for lazy enumeration of inner iterators. */
+    @Test
+    public void testIteratorPullsSingleKeyFromAllDescriptors() throws 
AssertionError {
+        CountingManyKeysKeyedStateBackend keyedStateBackend =
+                createManyKeysKeyedStateBackend(100_000_000);
+        MultiStateKeyIterator<Integer> testedIterator =
+                new MultiStateKeyIterator<>(descriptors, keyedStateBackend);
+
+        boolean hasnext = testedIterator.hasNext();
+
+        Assert.assertEquals(
+                "Unexpected number of keys enumerated",
+                1,
+                keyedStateBackend.numberOfKeysEnumerated);
+    }
+
+    /**
+     * Mockup {@link AbstractKeyedStateBackend} that counts how many keys are 
enumerated.
+     *
+     * <p>Generates a configured nmber of integer keys, only method actually 
implemented is {@link
+     * CountingManyKeysKeyedStateBackend#getKeys(java.lang.String, 
java.lang.Object)}
+     */
+    static class CountingManyKeysKeyedStateBackend extends 
AbstractKeyedStateBackend<Integer> {

Review Comment:
   :+1:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to