Tend to be not in favor of this proposal as union state is somewhat abused in 
several popular source connectors (e.g. kafka), and increasing this value could 
lead to JM OOM when sending tdd from JM to TMs with large parallelism.

After we collect union state and initialize the map list [1], we already have 
union state ready to assign. At this time, the memory footprint has not 
increase too much as the union state which shared across tasks have the same 
reference of ByteStreamStateHandle. However, when we send tdd with the 
taskRestore to TMs, akka will serialize those ByteStreamStateHandle within tdd 
to increases the memory footprint. If the source have 1024 parallelisms, and 
any one of the sub-task would then have 1024*100KB size state handles. The sum 
of total memory footprint cannot be ignored.

If we plan to increase the default value of state.backend.fs.memory-threshold, 
we should first resolve the above case.
In other words, this proposal could be a trade-off, which benefit perhaps 99% 
users, but might bring harmful effects to 1% user with large-scale flink jobs.


[1] 
https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87

Best
Yun Tang


________________________________
From: Yu Li <car...@gmail.com>
Sent: Thursday, May 14, 2020 23:51
To: Till Rohrmann <trohrm...@apache.org>
Cc: dev <dev@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 
100K

TL;DR: I have some reservations but tend to be +1 for the proposal,
meanwhile suggest we have a more thorough solution in the long run.

Please correct me if I'm wrong, but it seems the root cause of the issue is
too many small files generated.

I have some concerns for the case of session cluster [1], as well as
possible issues for users at large scale, otherwise I think increasing
`state.backend.fs.memory-threshold` to 100K is a good choice, based on the
assumption that a large portion of our users are running small jobs with
small states.

OTOH, maybe extending the solution [2] of resolving RocksDB small file
problem (as proposed by FLINK-11937 [3]) to also support operator state
could be an alternative? We have already applied the solution in production
for operator state and solved the HDFS NN RPC bottleneck problem on last
year's Singles' day.

Best Regards,
Yu

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
[2]
https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
<https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h>
[3] https://issues.apache.org/jira/browse/FLINK-11937


On Thu, 14 May 2020 at 21:45, Till Rohrmann <trohrm...@apache.org> wrote:

> I cannot say much about the concrete value but if our users have problems
> with the existing default values, then it makes sense to me to change it.
>
> One thing to check could be whether it is possible to provide a meaningful
> exception in case that the state size exceeds the frame size. At the
> moment, Flink should fail with a message saying that a rpc message exceeds
> the maximum frame size. Maybe it is also possible to point the user towards
> "state.backend.fs.memory-threshold" if the message exceeds the frame size
> because of too much state.
>
> Cheers,
> Till
>
> On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> wrote:
>
>> The parameter "state.backend.fs.memory-threshold" decides when a state
>> will
>> become a file and when it will be stored inline with the metadata (to
>> avoid
>> excessive amounts of small files).
>>
>> By default, this threshold is 1K - so every state above that size becomes
>> a
>> file. For many cases, this threshold seems to be too low.
>> There is an interesting talk with background on this from Scott Kidder:
>> https://www.youtube.com/watch?v=gycq0cY3TZ0
>>
>> I wanted to discuss increasing this to 100K by default.
>>
>> Advantage:
>>   - This should help many users out of the box, which otherwise see
>> checkpointing problems on systems like S3, GCS, etc.
>>
>> Disadvantage:
>>   - For very large jobs, this increases the required heap memory on the JM
>> side, because more state needs to be kept in-line when gathering the acks
>> for a pending checkpoint.
>>   - If tasks have a lot of states and each state is roughly at this
>> threshold, we increase the chance of exceeding the RPC frame size and
>> failing the job.
>>
>> What do you think?
>>
>> Best,
>> Stephan
>>
>

Reply via email to