RainerMatthiasS commented on code in PR #23239: URL: https://github.com/apache/flink/pull/23239#discussion_r1306984228
########## flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java: ########## @@ -31,65 +31,82 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Stream; /** * An iterator for reading all keys in a state backend across multiple partitioned states. * * <p>To read unique keys across all partitioned states callers must invoke {@link * MultiStateKeyIterator#remove}. * + * <p>Note: This is a replacement of the original implementation which used streams with a known + * flaw in the {@link Stream#flatMap(java.util.function.Function)} implementation that lead to + * completely enumerating and buffering nested iterators event for a single call to {@link + * MultiStateKeyIterator#hasNext}. + * + * @see <a + * href="https://bugs.openjdk.org/browse/JDK-8267359">https://bugs.openjdk.org/browse/JDK-8267359</a> * @param <K> Type of the key by which state is keyed. */ @Internal -public final class MultiStateKeyIterator<K> implements CloseableIterator<K> { +public class MultiStateKeyIterator<K> implements CloseableIterator<K> { private final List<? extends StateDescriptor<?, ?>> descriptors; private final KeyedStateBackend<K> backend; - private final Iterator<K> internal; + private Iterator<? extends StateDescriptor<?, ?>> outerIter; + private Iterator<K> innerIter; private final CloseableRegistry registry; private K currentKey; public MultiStateKeyIterator( List<? extends StateDescriptor<?, ?>> descriptors, KeyedStateBackend<K> backend) { + + outerIter = descriptors.iterator(); + innerIter = null; + this.descriptors = Preconditions.checkNotNull(descriptors); this.backend = Preconditions.checkNotNull(backend); this.registry = new CloseableRegistry(); - this.internal = - descriptors.stream() - .map( - descriptor -> - backend.getKeys( - descriptor.getName(), VoidNamespace.INSTANCE)) - .peek( - stream -> { - try { - registry.registerCloseable(stream::close); - } catch (IOException e) { - throw new RuntimeException( - "Failed to read keys from configured StateBackend", - e); - } - }) - .flatMap(stream -> stream) - .iterator(); + } + + @Override + public void close() throws Exception { + registry.close(); } @Override public boolean hasNext() { - return internal.hasNext(); + if ((innerIter == null || !innerIter.hasNext()) && outerIter.hasNext()) { + StateDescriptor<?, ?> descriptor = outerIter.next(); + Stream<K> stream = backend.getKeys(descriptor.getName(), VoidNamespace.INSTANCE); + innerIter = stream.iterator(); + try { + registry.registerCloseable(stream::close); + } catch (IOException e) { + throw new RuntimeException("Failed to read keys from configured StateBackend", e); + } + } + if (innerIter == null) { + return false; + } + return innerIter.hasNext(); } @Override public K next() { - currentKey = internal.next(); - return currentKey; + if (!this.hasNext()) { + throw new NoSuchElementException(); + } else { + currentKey = this.innerIter.next(); + return currentKey; + } } - /** Removes the current key from <b>ALL</b> known states in the state backend. */ Review Comment: It is an oversight ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org