[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754401#comment-17754401
 ] 

Hangxiang Yu commented on FLINK-26585:
--------------------------------------

[~Matthias Schwalbe] 
Hi, Thanks for reporting this. 
Let me try to summay the procedure of PR:
1. You may have implemented the code in your local codebase.

2.  Use _git remote add_ to add your personal flink github repo and apache 
flink github repo in your local codebase.

3. Rebase your code with branch (e.g. master) in apache flink repo.

4. Push your code into a new branch in your personal flink github repo.
Then you will find a Pull Request button in your personal repo, you could 
follow the procedure which will be reminded.

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26585
>                 URL: https://issues.apache.org/jira/browse/FLINK-26585
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.13.0, 1.14.0, 1.15.0
>            Reporter: Matthias Schwalbe
>            Priority: Major
>         Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)                          
>              # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends 
> Stream<? extends R>> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream)                         
>              # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)                          
>              # <R> Stream<R> map(final Function<? super P_OUT, ? extends R> 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to