+1 on improving Union State implementation. I think the concerns raised around union state is valid, meanwhile jobs with 200 parallelism on the source operator could be regarded as "large job".
To compromise, I suggest we split the improvements of the issue into 3 steps: 1. Increase `state.backend.fs.memory-threshold` from 1K to 20K (which will at most increase the memory cost on JM side by 200*200*20K=800MB) 2. Improve the union state implementation 3. Further increase `state.backend.fs.memory-threshold` higher What do you think? Thanks. Best Regards, Yu On Sat, 16 May 2020 at 23:15, Yun Tang <myas...@live.com> wrote: > If we cannot get rid of union state, I think we should introduce memory > control on the serialized TDDs when deploying > tasks instead of how union state is implemented when assign state in > StateAssignmentOperation. > The duplicated TaskStateSnapshot would not really increase much memory as > the ByteStreamStateHandle's are > actually share the same reference until they are serialized. > > When talking about the estimated memory footprint, I previously think that > depends on the pool size of future executor (HardWare#getNumberCPUCores). > However, with the simple program below, I found the async submit task logic > make the number of existing RemoteRpcInvocation in JM at the same time > larger than the HardWare#getNumberCPUCores. > Take below program for example, we have 200 parallelism of source and the > existing RemoteRpcInvocation in JM at the same time could be nearly 200 > while our pool size of future executor is only 96. I think if we could > clear the serialized data in RemoteRpcInvocation as soon as possible, we > might mitigate this problem greatly. > > Simple program which used union state to reproduce the memory footprint > problem: one sub-task of the total union state is 100KB bytes array, and > 200 sub-tasks in total could lead to more than 100KB * 200 * 200 = 3.8GB > memory for all union state. > > public class Program { > private static final Logger LOG = > LoggerFactory.getLogger(Program.class); > > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(60 * 1000L); > env.addSource(new MySource()).setParallelism(200).print(); > env.execute("Mock program"); > } > > private static class MySource extends > RichParallelSourceFunction<Integer> implements CheckpointedFunction { > private static final ListStateDescriptor<byte[]> stateDescriptor = > new ListStateDescriptor<>("list-1", byte[].class); > private ListState<byte[]> unionListState; > private volatile boolean running = true; > @Override > public void snapshotState(FunctionSnapshotContext context) throws > Exception { > unionListState.clear(); > byte[] array = new byte[100 * 1024]; > ThreadLocalRandom.current().nextBytes(array); > unionListState.add(array); > } > > @Override > public void initializeState(FunctionInitializationContext context) > throws Exception { > if (context.isRestored()) { > unionListState = > context.getOperatorStateStore().getUnionListState(stateDescriptor); > List<byte[]> collect = > StreamSupport.stream(unionListState.get().spliterator(), > false).collect(Collectors.toList()); > LOG.info("union state Collect size: {}.", collect.size()); > } else { > unionListState = > context.getOperatorStateStore().getUnionListState(stateDescriptor); > } > } > > @Override > public void run(SourceContext<Integer> ctx) throws Exception { > while (running) { > synchronized (ctx.getCheckpointLock()) { > ctx.collect(ThreadLocalRandom.current().nextInt()); > } > Thread.sleep(100); > } > } > > @Override > public void cancel() { > running = false; > } > } > } > > Best > Yun Tang > ________________________________ > From: Stephan Ewen <se...@apache.org> > Sent: Saturday, May 16, 2020 18:56 > To: dev <dev@flink.apache.org> > Cc: Till Rohrmann <trohrm...@apache.org>; Piotr Nowojski < > pi...@ververica.com> > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" from > 1K to 100K > > Okay, thank you for all the feedback. > > So we should definitely work on getting rid of the Union State, or at least > change the way it is implemented (avoid duplicate serializer snapshots). > > Can you estimate from which size of the cluster on the JM heap usage > becomes critical (if we increased the threshold to 100k, or maybe 50k) ? > > > On Sat, May 16, 2020 at 8:10 AM Congxian Qiu <qcx978132...@gmail.com> > wrote: > > > Hi, > > > > Overall, I agree with increasing this value. but the default value set to > > 100K maybe something too large from my side. > > > > I want to share some more information from my side. > > > > The small files problem is indeed a problem many users may encounter in > > production env. The states(Keyed state and Operator state) can become > small > > files in DFS, but increase the value of > `state.backend.fs.memory-threshold` > > may encounter the JM OOM problem as Yun said previously. > > We've tried increase this value in our production env, but some > connectors > > which UnionState prevent us to do this, the memory consumed by these jobs > > can be very large (in our case, thousands of parallelism, set > > `state.backend.fs.memory-threshold` to 64K, will consume 10G+ memory for > > JM), so in the end, we use the solution proposed in FLINK-11937[1] for > both > > keyed state and operator state. > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11937 > > Best, > > Congxian > > > > > > Yun Tang <myas...@live.com> 于2020年5月15日周五 下午9:09写道: > > > > > Please correct me if I am wrong, "put the increased value into the > > default > > > configuration" means > > > we will update that in default flink-conf.yaml but still leave the > > default > > > value of `state.backend.fs.memory-threshold`as previously? > > > It seems I did not get the point why existing setups with existing > > configs > > > will not be affected. > > > > > > The concern I raised is because one of our large-scale job with 1024 > > > parallelism source of union state meet the JM OOM problem when we > > increase > > > this value. > > > I think if we introduce memory control when serializing TDD > > asynchronously > > > [1], we could be much more confident to increase this configuration as > > the > > > memory footprint > > > expands at that time by a lot of serialized TDDs. > > > > > > > > > [1] > > > > > > https://github.com/apache/flink/blob/32bd0944d0519093c0a4d5d809c6636eb3a7fc31/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L752 > > > > > > Best > > > Yun Tang > > > > > > ________________________________ > > > From: Stephan Ewen <se...@apache.org> > > > Sent: Friday, May 15, 2020 16:53 > > > To: dev <dev@flink.apache.org> > > > Cc: Till Rohrmann <trohrm...@apache.org>; Piotr Nowojski < > > > pi...@ververica.com> > > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" > from > > > 1K to 100K > > > > > > I see, thanks for all the input. > > > > > > I agree with Yun Tang that the use of UnionState is problematic and can > > > cause issues in conjunction with this. > > > However, most of the large-scale users I know that also struggle with > > > UnionState have also increased this threshold, because with this low > > > threshold, they get an excess number of small files and overwhelm their > > > HDFS / S3 / etc. > > > > > > An intermediate solution could be to put the increased value into the > > > default configuration. That way, existing setups with existing configs > > will > > > not be affected, but new users / installations will have a simper time. > > > > > > Best, > > > Stephan > > > > > > > > > On Thu, May 14, 2020 at 9:20 PM Yun Tang <myas...@live.com> wrote: > > > > > > > Tend to be not in favor of this proposal as union state is somewhat > > > abused > > > > in several popular source connectors (e.g. kafka), and increasing > this > > > > value could lead to JM OOM when sending tdd from JM to TMs with large > > > > parallelism. > > > > > > > > After we collect union state and initialize the map list [1], we > > already > > > > have union state ready to assign. At this time, the memory footprint > > has > > > > not increase too much as the union state which shared across tasks > have > > > the > > > > same reference of ByteStreamStateHandle. However, when we send tdd > with > > > the > > > > taskRestore to TMs, akka will serialize those ByteStreamStateHandle > > > within > > > > tdd to increases the memory footprint. If the source have 1024 > > > > parallelisms, and any one of the sub-task would then have 1024*100KB > > size > > > > state handles. The sum of total memory footprint cannot be ignored. > > > > > > > > If we plan to increase the default value of > > > > state.backend.fs.memory-threshold, we should first resolve the above > > > case. > > > > In other words, this proposal could be a trade-off, which benefit > > perhaps > > > > 99% users, but might bring harmful effects to 1% user with > large-scale > > > > flink jobs. > > > > > > > > > > > > [1] > > > > > > > > > > https://github.com/apache/flink/blob/c1ea6fcfd05c72a68739bda8bd16a2d1c15522c0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L64-L87 > > > > > > > > Best > > > > Yun Tang > > > > > > > > > > > > ________________________________ > > > > From: Yu Li <car...@gmail.com> > > > > Sent: Thursday, May 14, 2020 23:51 > > > > To: Till Rohrmann <trohrm...@apache.org> > > > > Cc: dev <dev@flink.apache.org>; Piotr Nowojski <pi...@ververica.com> > > > > Subject: Re: [DISCUSS] increase "state.backend.fs.memory-threshold" > > from > > > > 1K to 100K > > > > > > > > TL;DR: I have some reservations but tend to be +1 for the proposal, > > > > meanwhile suggest we have a more thorough solution in the long run. > > > > > > > > Please correct me if I'm wrong, but it seems the root cause of the > > issue > > > is > > > > too many small files generated. > > > > > > > > I have some concerns for the case of session cluster [1], as well as > > > > possible issues for users at large scale, otherwise I think > increasing > > > > `state.backend.fs.memory-threshold` to 100K is a good choice, based > on > > > the > > > > assumption that a large portion of our users are running small jobs > > with > > > > small states. > > > > > > > > OTOH, maybe extending the solution [2] of resolving RocksDB small > file > > > > problem (as proposed by FLINK-11937 [3]) to also support operator > state > > > > could be an alternative? We have already applied the solution in > > > production > > > > for operator state and solved the HDFS NN RPC bottleneck problem on > > last > > > > year's Singles' day. > > > > > > > > Best Regards, > > > > Yu > > > > > > > > [1] > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/concepts/glossary.html#flink-session-cluster > > > > [2] > > > > > > > > > > > > > > https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg > > > > < > > > > > > > > > > https://docs.google.com/document/d/1ukLfqNt44yqhDFL3uIhd68NevVdawccb6GflGNFzLcg/edit#heading=h.rl48knhoni0h > > > > > > > > > [3] https://issues.apache.org/jira/browse/FLINK-11937 > > > > > > > > > > > > On Thu, 14 May 2020 at 21:45, Till Rohrmann <trohrm...@apache.org> > > > wrote: > > > > > > > > > I cannot say much about the concrete value but if our users have > > > problems > > > > > with the existing default values, then it makes sense to me to > change > > > it. > > > > > > > > > > One thing to check could be whether it is possible to provide a > > > > meaningful > > > > > exception in case that the state size exceeds the frame size. At > the > > > > > moment, Flink should fail with a message saying that a rpc message > > > > exceeds > > > > > the maximum frame size. Maybe it is also possible to point the user > > > > towards > > > > > "state.backend.fs.memory-threshold" if the message exceeds the > frame > > > size > > > > > because of too much state. > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Thu, May 14, 2020 at 2:34 PM Stephan Ewen <se...@apache.org> > > wrote: > > > > > > > > > >> The parameter "state.backend.fs.memory-threshold" decides when a > > state > > > > >> will > > > > >> become a file and when it will be stored inline with the metadata > > (to > > > > >> avoid > > > > >> excessive amounts of small files). > > > > >> > > > > >> By default, this threshold is 1K - so every state above that size > > > > becomes > > > > >> a > > > > >> file. For many cases, this threshold seems to be too low. > > > > >> There is an interesting talk with background on this from Scott > > > Kidder: > > > > >> https://www.youtube.com/watch?v=gycq0cY3TZ0 > > > > >> > > > > >> I wanted to discuss increasing this to 100K by default. > > > > >> > > > > >> Advantage: > > > > >> - This should help many users out of the box, which otherwise > see > > > > >> checkpointing problems on systems like S3, GCS, etc. > > > > >> > > > > >> Disadvantage: > > > > >> - For very large jobs, this increases the required heap memory > on > > > the > > > > JM > > > > >> side, because more state needs to be kept in-line when gathering > the > > > > acks > > > > >> for a pending checkpoint. > > > > >> - If tasks have a lot of states and each state is roughly at > this > > > > >> threshold, we increase the chance of exceeding the RPC frame size > > and > > > > >> failing the job. > > > > >> > > > > >> What do you think? > > > > >> > > > > >> Best, > > > > >> Stephan > > > > >> > > > > > > > > > > > > > > >