RainerMatthiasS commented on code in PR #23239:
URL: https://github.com/apache/flink/pull/23239#discussion_r1306984228


##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -31,65 +31,82 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Stream;
 
 /**
  * An iterator for reading all keys in a state backend across multiple 
partitioned states.
  *
  * <p>To read unique keys across all partitioned states callers must invoke 
{@link
  * MultiStateKeyIterator#remove}.
  *
+ * <p>Note: This is a replacement of the original implementation which used 
streams with a known
+ * flaw in the {@link Stream#flatMap(java.util.function.Function)} 
implementation that lead to
+ * completely enumerating and buffering nested iterators event for a single 
call to {@link
+ * MultiStateKeyIterator#hasNext}.
+ *
+ * @see <a
+ *     
href="https://bugs.openjdk.org/browse/JDK-8267359";>https://bugs.openjdk.org/browse/JDK-8267359</a>
  * @param <K> Type of the key by which state is keyed.
  */
 @Internal
-public final class MultiStateKeyIterator<K> implements CloseableIterator<K> {
+public class MultiStateKeyIterator<K> implements CloseableIterator<K> {
     private final List<? extends StateDescriptor<?, ?>> descriptors;
 
     private final KeyedStateBackend<K> backend;
 
-    private final Iterator<K> internal;
+    private Iterator<? extends StateDescriptor<?, ?>> outerIter;
+    private Iterator<K> innerIter;
 
     private final CloseableRegistry registry;
 
     private K currentKey;
 
     public MultiStateKeyIterator(
             List<? extends StateDescriptor<?, ?>> descriptors, 
KeyedStateBackend<K> backend) {
+
+        outerIter = descriptors.iterator();
+        innerIter = null;
+
         this.descriptors = Preconditions.checkNotNull(descriptors);
         this.backend = Preconditions.checkNotNull(backend);
 
         this.registry = new CloseableRegistry();
-        this.internal =
-                descriptors.stream()
-                        .map(
-                                descriptor ->
-                                        backend.getKeys(
-                                                descriptor.getName(), 
VoidNamespace.INSTANCE))
-                        .peek(
-                                stream -> {
-                                    try {
-                                        
registry.registerCloseable(stream::close);
-                                    } catch (IOException e) {
-                                        throw new RuntimeException(
-                                                "Failed to read keys from 
configured StateBackend",
-                                                e);
-                                    }
-                                })
-                        .flatMap(stream -> stream)
-                        .iterator();
+    }
+
+    @Override
+    public void close() throws Exception {
+        registry.close();
     }
 
     @Override
     public boolean hasNext() {
-        return internal.hasNext();
+        if ((innerIter == null || !innerIter.hasNext()) && 
outerIter.hasNext()) {
+            StateDescriptor<?, ?> descriptor = outerIter.next();
+            Stream<K> stream = backend.getKeys(descriptor.getName(), 
VoidNamespace.INSTANCE);
+            innerIter = stream.iterator();
+            try {
+                registry.registerCloseable(stream::close);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to read keys from 
configured StateBackend", e);
+            }
+        }
+        if (innerIter == null) {
+            return false;
+        }
+        return innerIter.hasNext();
     }
 
     @Override
     public K next() {
-        currentKey = internal.next();
-        return currentKey;
+        if (!this.hasNext()) {
+            throw new NoSuchElementException();
+        } else {
+            currentKey = this.innerIter.next();
+            return currentKey;
+        }
     }
 
-    /** Removes the current key from <b>ALL</b> known states in the state 
backend. */

Review Comment:
   It is an oversight ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to