masteryhx commented on code in PR #23239: URL: https://github.com/apache/flink/pull/23239#discussion_r1307015491
########## flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java: ########## @@ -31,65 +31,82 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Stream; /** * An iterator for reading all keys in a state backend across multiple partitioned states. * * <p>To read unique keys across all partitioned states callers must invoke {@link * MultiStateKeyIterator#remove}. * + * <p>Note: This is a replacement of the original implementation which used streams with a known + * flaw in the {@link Stream#flatMap(java.util.function.Function)} implementation that lead to + * completely enumerating and buffering nested iterators event for a single call to {@link + * MultiStateKeyIterator#hasNext}. + * + * @see <a + * href="https://bugs.openjdk.org/browse/JDK-8267359">https://bugs.openjdk.org/browse/JDK-8267359</a> * @param <K> Type of the key by which state is keyed. */ @Internal -public final class MultiStateKeyIterator<K> implements CloseableIterator<K> { +public class MultiStateKeyIterator<K> implements CloseableIterator<K> { private final List<? extends StateDescriptor<?, ?>> descriptors; private final KeyedStateBackend<K> backend; - private final Iterator<K> internal; + private Iterator<? extends StateDescriptor<?, ?>> outerIter; + private Iterator<K> innerIter; private final CloseableRegistry registry; private K currentKey; public MultiStateKeyIterator( List<? extends StateDescriptor<?, ?>> descriptors, KeyedStateBackend<K> backend) { + + outerIter = descriptors.iterator(); + innerIter = null; + this.descriptors = Preconditions.checkNotNull(descriptors); this.backend = Preconditions.checkNotNull(backend); this.registry = new CloseableRegistry(); - this.internal = - descriptors.stream() - .map( - descriptor -> - backend.getKeys( - descriptor.getName(), VoidNamespace.INSTANCE)) - .peek( - stream -> { - try { - registry.registerCloseable(stream::close); - } catch (IOException e) { - throw new RuntimeException( - "Failed to read keys from configured StateBackend", - e); - } - }) - .flatMap(stream -> stream) - .iterator(); + } + + @Override + public void close() throws Exception { + registry.close(); } @Override public boolean hasNext() { - return internal.hasNext(); + if ((innerIter == null || !innerIter.hasNext()) && outerIter.hasNext()) { + StateDescriptor<?, ?> descriptor = outerIter.next(); + Stream<K> stream = backend.getKeys(descriptor.getName(), VoidNamespace.INSTANCE); + innerIter = stream.iterator(); + try { + registry.registerCloseable(stream::close); + } catch (IOException e) { + throw new RuntimeException("Failed to read keys from configured StateBackend", e); + } + } + if (innerIter == null) { + return false; + } + return innerIter.hasNext(); Review Comment: Yeah, I think you should. IIUC, just in this case, it may also bring some exceptions: 1. `StateDescriptor` is passed by users, which may not be correct 2. if user just registered state but not put any states, the state will be empty (only state meta is stored in the checkpoint, this also works) Of course, even if it works till now, we also should guarantee that this class could work not coupling to other calsses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org