Please correct me if I am wrong, "put the increased value into the default 
configuration" means
we will update that in default flink-conf.yaml but still leave the default 
value of `state.backend.fs.memory-threshold`as previously?
It seems I did not get the point why existing setups with existing configs will 
not be affected.

The concern I raised is because one of our large-scale job with 1024 
parallelism source of union state meet the JM OOM problem when we increase this 
value.
I think if we introduce memory control when serializing TDD asynchronously [1], 
we could be much more confident to increase this configuration as the memory 
footprint
expands at that time by a lot of serialized TDDs.


[1] 
https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752

Best
Yun Tang

________________________________
From: Stephan Ewen <se...@apache.org>
Sent: Friday, May 15, 2020 16:53
To: dev <dev@flink.apache.org>
Cc: Till Rohrmann <trohrm...@apache.org>; Piotr Nowojski <pi...@ververica.com>
Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from 1K to 
100K

I see, thanks for all the input.

I agree with Yun Tang that the use of UnionState is problematic and can
cause issues in conjunction with this.
However, most of the large-scale users I know that also struggle with
UnionState have also increased this threshold, because with this low
threshold, they get an excess number of small files and overwhelm their
HDFS / S3 / etc.

An intermediate solution could be to put the increased value into the
default configuration. That way, existing setups with existing configs will
not be affected, but new users / installations will have a simper time.

Best,
Stephan


On Thu, May 14, 2020 at 9:20 PM Yun Tang <myas...@live.com> wrote:

> Tend to be not in favor of this proposal as union state is somewhat abused
> in several popular source connectors (e.g. kafka), and increasing this
> value could lead to JM OOM when sending tdd from JM to TMs with large
> parallelism.
>
> After we collect union state and initialize the map list [1], we already
> have union state ready to assign. At this time, the memory footprint has
> not increase too much as the union state which shared across tasks have the
> same reference of ByteStreamStateHandle. However, when we send tdd with the
> taskRestore to TMs, akka will serialize those ByteStreamStateHandle within
> tdd to increases the memory footprint. If the source have 1024
> parallelisms, and any one of the sub-task would then have 1024*100KB size
> state handles. The sum of total memory footprint cannot be ignored.
>
> If we plan to increase the default value of
> state.backend.fs.memory-threshold, we should first resolve the above case.
> In other words, this proposal could be a trade-off, which benefit perhaps
> 99% users, but might bring harmful effects to 1% user with large-scale
> flink jobs.
>
>
> [1]
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
>
> Best
> Yun Tang
>
>
> ________________________________
> From: Yu Li <car...@gmail.com>
> Sent: Thursday, May 14, 2020 23:51
> To: Till Rohrmann <trohrm...@apache.org>
> Cc: dev <dev@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
> Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> 1K to 100K
>
> TL;DR: I have some reservations but tend to be +1 for the proposal,
> meanwhile suggest we have a more thorough solution in the long run.
>
> Please correct me if I'm wrong, but it seems the root cause of the issue is
> too many small files generated.
>
> I have some concerns for the case of session cluster [1], as well as
> possible issues for users at large scale, otherwise I think increasing
> `state.backend.fs.memory-threshold` to 100K is a good choice, based on the
> assumption that a large portion of our users are running small jobs with
> small states.
>
> OTOH, maybe extending the solution [2] of resolving RocksDB small file
> problem (as proposed by FLINK-11937 [3]) to also support operator state
> could be an alternative? We have already applied the solution in production
> for operator state and solved the HDFS NN RPC bottleneck problem on last
> year's Singles' day.
>
> Best Regards,
> Yu
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> [2]
>
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> <
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> >
> [3] https://issues.apache.org/jira/browse/FLINK-11937
>
>
> On Thu, 14 May 2020 at 21:45, Till Rohrmann <trohrm...@apache.org> wrote:
>
> > I cannot say much about the concrete value but if our users have problems
> > with the existing default values, then it makes sense to me to change it.
> >
> > One thing to check could be whether it is possible to provide a
> meaningful
> > exception in case that the state size exceeds the frame size. At the
> > moment, Flink should fail with a message saying that a rpc message
> exceeds
> > the maximum frame size. Maybe it is also possible to point the user
> towards
> > "state.backend.fs.memory-threshold" if the message exceeds the frame size
> > because of too much state.
> >
> > Cheers,
> > Till
> >
> > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> wrote:
> >
> >> The parameter "state.backend.fs.memory-threshold" decides when a state
> >> will
> >> become a file and when it will be stored inline with the metadata (to
> >> avoid
> >> excessive amounts of small files).
> >>
> >> By default, this threshold is 1K - so every state above that size
> becomes
> >> a
> >> file. For many cases, this threshold seems to be too low.
> >> There is an interesting talk with background on this from Scott Kidder:
> >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> >>
> >> I wanted to discuss increasing this to 100K by default.
> >>
> >> Advantage:
> >>   - This should help many users out of the box, which otherwise see
> >> checkpointing problems on systems like S3, GCS, etc.
> >>
> >> Disadvantage:
> >>   - For very large jobs, this increases the required heap memory on the
> JM
> >> side, because more state needs to be kept in-line when gathering the
> acks
> >> for a pending checkpoint.
> >>   - If tasks have a lot of states and each state is roughly at this
> >> threshold, we increase the chance of exceeding the RPC frame size and
> >> failing the job.
> >>
> >> What do you think?
> >>
> >> Best,
> >> Stephan
> >>
> >
>

Reply via email to