I see, thanks for all the input. I agree with Yun Tang that the use of UnionState is problematic and can cause issues in conjunction with this. However, most of the large-scale users I know that also struggle with UnionState have also increased this threshold, because with this low threshold, they get an excess number of small files and overwhelm their HDFS / S3 / etc.
An intermediate solution could be to put the increased value into the default configuration. That way, existing setups with existing configs will not be affected, but new users / installations will have a simper time. Best, Stephan On Thu, May 14, 2020 at 9:20 PM Yun Tang <myas...@live.com> wrote: > Tend to be not in favor of this proposal as union state is somewhat abused > in several popular source connectors (e.g. kafka), and increasing this > value could lead to JM OOM when sending tdd from JM to TMs with large > parallelism. > > After we collect union state and initialize the map list [1], we already > have union state ready to assign. At this time, the memory footprint has > not increase too much as the union state which shared across tasks have the > same reference of ByteStreamStateHandle. However, when we send tdd with the > taskRestore to TMs, akka will serialize those ByteStreamStateHandle within > tdd to increases the memory footprint. If the source have 1024 > parallelisms, and any one of the sub-task would then have 1024*100KB size > state handles. The sum of total memory footprint cannot be ignored. > > If we plan to increase the default value of > state.backend.fs.memory-threshold, we should first resolve the above case. > In other words, this proposal could be a trade-off, which benefit perhaps > 99% users, but might bring harmful effects to 1% user with large-scale > flink jobs. > > > [1] > https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87 > > Best > Yun Tang > > > ________________________________ > From: Yu Li <car...@gmail.com> > Sent: Thursday, May 14, 2020 23:51 > To: Till Rohrmann <trohrm...@apache.org> > Cc: dev <dev@flink.apache.org>; Piotr Nowojski <pi...@ververica.com> > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from > 1K to 100K > > TL;DR: I have some reservations but tend to be +1 for the proposal, > meanwhile suggest we have a more thorough solution in the long run. > > Please correct me if I'm wrong, but it seems the root cause of the issue is > too many small files generated. > > I have some concerns for the case of session cluster [1], as well as > possible issues for users at large scale, otherwise I think increasing > `state.backend.fs.memory-threshold` to 100K is a good choice, based on the > assumption that a large portion of our users are running small jobs with > small states. > > OTOH, maybe extending the solution [2] of resolving RocksDB small file > problem (as proposed by FLINK-11937 [3]) to also support operator state > could be an alternative? We have already applied the solution in production > for operator state and solved the HDFS NN RPC bottleneck problem on last > year's Singles' day. > > Best Regards, > Yu > > [1] > > https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster > [2] > > https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg > < > https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h > > > [3] https://issues.apache.org/jira/browse/FLINK-11937 > > > On Thu, 14 May 2020 at 21:45, Till Rohrmann <trohrm...@apache.org> wrote: > > > I cannot say much about the concrete value but if our users have problems > > with the existing default values, then it makes sense to me to change it. > > > > One thing to check could be whether it is possible to provide a > meaningful > > exception in case that the state size exceeds the frame size. At the > > moment, Flink should fail with a message saying that a rpc message > exceeds > > the maximum frame size. Maybe it is also possible to point the user > towards > > "state.backend.fs.memory-threshold" if the message exceeds the frame size > > because of too much state. > > > > Cheers, > > Till > > > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> wrote: > > > >> The parameter "state.backend.fs.memory-threshold" decides when a state > >> will > >> become a file and when it will be stored inline with the metadata (to > >> avoid > >> excessive amounts of small files). > >> > >> By default, this threshold is 1K - so every state above that size > becomes > >> a > >> file. For many cases, this threshold seems to be too low. > >> There is an interesting talk with background on this from Scott Kidder: > >> https://www.youtube.com/watch?v=gycq0cY3TZ0 > >> > >> I wanted to discuss increasing this to 100K by default. > >> > >> Advantage: > >> - This should help many users out of the box, which otherwise see > >> checkpointing problems on systems like S3, GCS, etc. > >> > >> Disadvantage: > >> - For very large jobs, this increases the required heap memory on the > JM > >> side, because more state needs to be kept in-line when gathering the > acks > >> for a pending checkpoint. > >> - If tasks have a lot of states and each state is roughly at this > >> threshold, we increase the chance of exceeding the RPC frame size and > >> failing the job. > >> > >> What do you think? > >> > >> Best, > >> Stephan > >> > > >