+1 on improving Union State implementation.

I think the concerns raised around union state is valid, meanwhile jobs
with 200 parallelism on the source operator could be regarded as "large
job".

To compromise, I suggest we split the improvements of the issue into 3
steps:

1. Increase `state.backend.fs.memory-threshold` from 1K to 20K (which will
at most increase the memory cost on JM side by 200*200*20K=800MB)
2. Improve the union state implementation
3. Further increase `state.backend.fs.memory-threshold` higher

What do you think? Thanks.

Best Regards,
Yu


On Sat, 16 May 2020 at 23:15, Yun Tang <myas...@live.com> wrote:

> If we cannot get rid of union state, I think we should introduce memory
> control on the serialized TDDs when deploying
> tasks instead of how union state is implemented when assign state in
> StateAssignmentOperation.
> The duplicated TaskStateSnapshot would not really increase much memory as
> the ByteStreamStateHandle's are
> actually share the same reference until they are serialized.
>
> When talking about the estimated memory footprint, I previously think that
> depends on the pool size of future executor (HardWare#getNumberCPUCores).
> However, with the simple program below, I found the async submit task logic
> make the number of existing RemoteRpcInvocation in JM at the same time
> larger than the HardWare#getNumberCPUCores.
> Take below program for example, we have 200 parallelism of source and the
> existing RemoteRpcInvocation in JM at the same time could be nearly 200
> while our pool size of future executor is only 96. I think if we could
> clear the serialized data in RemoteRpcInvocation as soon as possible, we
> might mitigate this problem greatly.
>
> Simple program which used union state to reproduce the memory footprint
> problem: one sub-task of the total union state is 100KB bytes array, and
> 200 sub-tasks in total could lead to more than 100KB * 200 * 200 = 3.8GB
> memory for all union state.
>
> public class Program {
>    private static final Logger LOG =
> LoggerFactory.getLogger(Program.class);
>
>    public static void main(String[] args) throws Exception {
>       final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>       env.enableCheckpointing(60 * 1000L);
>       env.addSource(new MySource()).setParallelism(200).print();
>       env.execute("Mock program");
>    }
>
>    private static class MySource extends
> RichParallelSourceFunction<Integer> implements CheckpointedFunction {
>       private static final ListStateDescriptor<byte[]> stateDescriptor =
> new ListStateDescriptor<>("list-1", byte[].class);
>       private ListState<byte[]> unionListState;
>       private volatile boolean running = true;
>       @Override
>       public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
>          unionListState.clear();
>          byte[] array = new byte[100 * 1024];
>          ThreadLocalRandom.current().nextBytes(array);
>          unionListState.add(array);
>       }
>
>       @Override
>       public void initializeState(FunctionInitializationContext context)
> throws Exception {
>          if (context.isRestored()) {
>             unionListState =
> context.getOperatorStateStore().getUnionListState(stateDescriptor);
>             List<byte[]> collect =
> StreamSupport.stream(unionListState.get().spliterator(),
> false).collect(Collectors.toList());
>             LOG.info("union state Collect size: {}.", collect.size());
>          } else {
>             unionListState =
> context.getOperatorStateStore().getUnionListState(stateDescriptor);
>          }
>       }
>
>       @Override
>       public void run(SourceContext<Integer> ctx) throws Exception {
>          while (running) {
>             synchronized (ctx.getCheckpointLock()) {
>                ctx.collect(ThreadLocalRandom.current().nextInt());
>             }
>             Thread.sleep(100);
>          }
>       }
>
>       @Override
>       public void cancel() {
>          running = false;
>       }
>    }
> }
>
> Best
> Yun Tang
> ________________________________
> From: Stephan Ewen <se...@apache.org>
> Sent: Saturday, May 16, 2020 18:56
> To: dev <dev@flink.apache.org>
> Cc: Till Rohrmann <trohrm...@apache.org>; Piotr Nowojski <
> pi...@ververica.com>
> Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from
> 1K to 100K
>
> Okay, thank you for all the feedback.
>
> So we should definitely work on getting rid of the Union State, or at least
> change the way it is implemented (avoid duplicate serializer snapshots).
>
> Can you estimate from which size of the cluster on the JM heap usage
> becomes critical (if we increased the threshold to 100k, or maybe 50k) ?
>
>
> On Sat, May 16, 2020 at 8:10 AM Congxian Qiu <qcx978132...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Overall, I agree with increasing this value. but the default value set to
> > 100K maybe something too large from my side.
> >
> > I want to share some more information from my side.
> >
> > The small files problem is indeed a problem many users may encounter in
> > production env. The states(Keyed state and Operator state) can become
> small
> > files in DFS, but increase the value of
> `state.backend.fs.memory-threshold`
> > may encounter the JM OOM problem as Yun said previously.
> > We've tried increase this value in our production env, but some
> connectors
> > which UnionState prevent us to do this, the memory consumed by these jobs
> > can be very large (in our case, thousands of parallelism, set
> > `state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory for
> > JM), so in the end, we use the solution proposed in FLINK-11937[1] for
> both
> > keyed state and operator state.
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-11937
> > Best,
> > Congxian
> >
> >
> > Yun Tang <myas...@live.com> 于2020年5月15日周五 下午9:09写道:
> >
> > > Please correct me if I am wrong, "put the increased value into the
> > default
> > > configuration" means
> > > we will update that in default flink-conf.yaml but still leave the
> > default
> > > value of `state.backend.fs.memory-threshold`as previously?
> > > It seems I did not get the point why existing setups with existing
> > configs
> > > will not be affected.
> > >
> > > The concern I raised is because one of our large-scale job with 1024
> > > parallelism source of union state meet the JM OOM problem when we
> > increase
> > > this value.
> > > I think if we introduce memory control when serializing TDD
> > asynchronously
> > > [1], we could be much more confident to increase this configuration as
> > the
> > > memory footprint
> > > expands at that time by a lot of serialized TDDs.
> > >
> > >
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752
> > >
> > > Best
> > > Yun Tang
> > >
> > > ________________________________
> > > From: Stephan Ewen <se...@apache.org>
> > > Sent: Friday, May 15, 2020 16:53
> > > To: dev <dev@flink.apache.org>
> > > Cc: Till Rohrmann <trohrm...@apache.org>; Piotr Nowojski <
> > > pi...@ververica.com>
> > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> from
> > > 1K to 100K
> > >
> > > I see, thanks for all the input.
> > >
> > > I agree with Yun Tang that the use of UnionState is problematic and can
> > > cause issues in conjunction with this.
> > > However, most of the large-scale users I know that also struggle with
> > > UnionState have also increased this threshold, because with this low
> > > threshold, they get an excess number of small files and overwhelm their
> > > HDFS / S3 / etc.
> > >
> > > An intermediate solution could be to put the increased value into the
> > > default configuration. That way, existing setups with existing configs
> > will
> > > not be affected, but new users / installations will have a simper time.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Thu, May 14, 2020 at 9:20 PM Yun Tang <myas...@live.com> wrote:
> > >
> > > > Tend to be not in favor of this proposal as union state is somewhat
> > > abused
> > > > in several popular source connectors (e.g. kafka), and increasing
> this
> > > > value could lead to JM OOM when sending tdd from JM to TMs with large
> > > > parallelism.
> > > >
> > > > After we collect union state and initialize the map list [1], we
> > already
> > > > have union state ready to assign. At this time, the memory footprint
> > has
> > > > not increase too much as the union state which shared across tasks
> have
> > > the
> > > > same reference of ByteStreamStateHandle. However, when we send tdd
> with
> > > the
> > > > taskRestore to TMs, akka will serialize those ByteStreamStateHandle
> > > within
> > > > tdd to increases the memory footprint. If the source have 1024
> > > > parallelisms, and any one of the sub-task would then have 1024*100KB
> > size
> > > > state handles. The sum of total memory footprint cannot be ignored.
> > > >
> > > > If we plan to increase the default value of
> > > > state.backend.fs.memory-threshold, we should first resolve the above
> > > case.
> > > > In other words, this proposal could be a trade-off, which benefit
> > perhaps
> > > > 99% users, but might bring harmful effects to 1% user with
> large-scale
> > > > flink jobs.
> > > >
> > > >
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87
> > > >
> > > > Best
> > > > Yun Tang
> > > >
> > > >
> > > > ________________________________
> > > > From: Yu Li <car...@gmail.com>
> > > > Sent: Thursday, May 14, 2020 23:51
> > > > To: Till Rohrmann <trohrm...@apache.org>
> > > > Cc: dev <dev@flink.apache.org>; Piotr Nowojski <pi...@ververica.com>
> > > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold"
> > from
> > > > 1K to 100K
> > > >
> > > > TL;DR: I have some reservations but tend to be +1 for the proposal,
> > > > meanwhile suggest we have a more thorough solution in the long run.
> > > >
> > > > Please correct me if I'm wrong, but it seems the root cause of the
> > issue
> > > is
> > > > too many small files generated.
> > > >
> > > > I have some concerns for the case of session cluster [1], as well as
> > > > possible issues for users at large scale, otherwise I think
> increasing
> > > > `state.backend.fs.memory-threshold` to 100K is a good choice, based
> on
> > > the
> > > > assumption that a large portion of our users are running small jobs
> > with
> > > > small states.
> > > >
> > > > OTOH, maybe extending the solution [2] of resolving RocksDB small
> file
> > > > problem (as proposed by FLINK-11937 [3]) to also support operator
> state
> > > > could be an alternative? We have already applied the solution in
> > > production
> > > > for operator state and solved the HDFS NN RPC bottleneck problem on
> > last
> > > > year's Singles' day.
> > > >
> > > > Best Regards,
> > > > Yu
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster
> > > > [2]
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg
> > > > <
> > > >
> > >
> >
> https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h
> > > > >
> > > > [3] https://issues.apache.org/jira/browse/FLINK-11937
> > > >
> > > >
> > > > On Thu, 14 May 2020 at 21:45, Till Rohrmann <trohrm...@apache.org>
> > > wrote:
> > > >
> > > > > I cannot say much about the concrete value but if our users have
> > > problems
> > > > > with the existing default values, then it makes sense to me to
> change
> > > it.
> > > > >
> > > > > One thing to check could be whether it is possible to provide a
> > > > meaningful
> > > > > exception in case that the state size exceeds the frame size. At
> the
> > > > > moment, Flink should fail with a message saying that a rpc message
> > > > exceeds
> > > > > the maximum frame size. Maybe it is also possible to point the user
> > > > towards
> > > > > "state.backend.fs.memory-threshold" if the message exceeds the
> frame
> > > size
> > > > > because of too much state.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org>
> > wrote:
> > > > >
> > > > >> The parameter "state.backend.fs.memory-threshold" decides when a
> > state
> > > > >> will
> > > > >> become a file and when it will be stored inline with the metadata
> > (to
> > > > >> avoid
> > > > >> excessive amounts of small files).
> > > > >>
> > > > >> By default, this threshold is 1K - so every state above that size
> > > > becomes
> > > > >> a
> > > > >> file. For many cases, this threshold seems to be too low.
> > > > >> There is an interesting talk with background on this from Scott
> > > Kidder:
> > > > >> https://www.youtube.com/watch?v=gycq0cY3TZ0
> > > > >>
> > > > >> I wanted to discuss increasing this to 100K by default.
> > > > >>
> > > > >> Advantage:
> > > > >>   - This should help many users out of the box, which otherwise
> see
> > > > >> checkpointing problems on systems like S3, GCS, etc.
> > > > >>
> > > > >> Disadvantage:
> > > > >>   - For very large jobs, this increases the required heap memory
> on
> > > the
> > > > JM
> > > > >> side, because more state needs to be kept in-line when gathering
> the
> > > > acks
> > > > >> for a pending checkpoint.
> > > > >>   - If tasks have a lot of states and each state is roughly at
> this
> > > > >> threshold, we increase the chance of exceeding the RPC frame size
> > and
> > > > >> failing the job.
> > > > >>
> > > > >> What do you think?
> > > > >>
> > > > >> Best,
> > > > >> Stephan
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to