[ https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hangxiang Yu reassigned FLINK-26585: ------------------------------------ Assignee: Matthias Schwalbe > State Processor API: Loading a state set buffers the whole state set in > memory before starting to process > --------------------------------------------------------------------------------------------------------- > > Key: FLINK-26585 > URL: https://issues.apache.org/jira/browse/FLINK-26585 > Project: Flink > Issue Type: Improvement > Components: API / State Processor > Affects Versions: 1.13.0, 1.14.0, 1.15.0 > Reporter: Matthias Schwalbe > Assignee: Matthias Schwalbe > Priority: Major > Attachments: MultiStateKeyIteratorNoStreams.java > > > * When loading a state, MultiStateKeyIterator load and bufferes the whole > state in memory before it event processes a single data point > ** This is absolutely no problem for small state (hence the unit tests work > fine) > ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state > descriptors and flattens all datapoints contained within > ** The java.util.stream.Stream#flatMap function causes the buffering of the > whole data set when enumerated later on > ** See call stack [1] > *** I our case this is 150e6 data points (> 1GiB just for the pointers to > the data, let alone the data itself ~30GiB) > ** I’m not aware of some instrumentation of Stream in order to avoid the > problem, hence > ** I coded an alternative implementation of MultiStateKeyIterator that > avoids using java Stream, > ** I can contribute our implementation (MultiStateKeyIteratorNoStreams) > [1] > Streams call stack: > hasNext:77, RocksStateKeysIterator > (org.apache.flink.contrib.streaming.state.iterator) > next:82, RocksStateKeysIterator > (org.apache.flink.contrib.streaming.state.iterator) > forEachRemaining:116, Iterator (java.util) > forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util) > forEach:580, ReferencePipeline$Head (java.util.stream) > accept:270, ReferencePipeline$7$1 (java.util.stream) > # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends > Stream<? extends R>> var1) > accept:373, ReferencePipeline$11$1 (java.util.stream) > # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1) > accept:193, ReferencePipeline$3$1 (java.util.stream) > # <R> Stream<R> map(final Function<? super P_OUT, ? extends R> > var1) > tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util) > lambda$initPartialTraversalState$0:294, > StreamSpliterators$WrappingSpliterator (java.util.stream) > getAsBoolean:-1, 1528195520 > (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57) > fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator > (java.util.stream) > doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator > (java.util.stream) > tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream) > hasNext:681, Spliterators$1Adapter (java.util) > hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input) > hasNext:162, KeyedStateReaderOperator$NamespaceDecorator > (org.apache.flink.state.api.input.operator) > reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input) > invoke:191, DataSourceTask (org.apache.flink.runtime.operators) > doRun:776, Task (org.apache.flink.runtime.taskmanager) > run:563, Task (org.apache.flink.runtime.taskmanager) > run:748, Thread (java.lang) -- This message was sent by Atlassian Jira (v8.20.10#820010)