Thanks for proposing this FLIP Xintong. All in all I think it already looks quite good. Concerning the first open question about allocating memory segments, I was wondering whether this is strictly necessary to do in the context of this FLIP or whether this could be done as a follow up? Without knowing all details, I would be concerned that we would widen the scope of this FLIP too much because we would have to touch all the existing call sites of the MemoryManager where we allocate memory segments (this should mainly be batch operators). The addition of the memory reservation call to the MemoryManager should not be affected by this and I would hope that this is the only point of interaction a streaming job would have with the MemoryManager.
Concerning the second open question about setting or not setting a max direct memory limit, I would also be interested why Yang Wang thinks leaving it open would be best. My concern about this would be that we would be in a similar situation as we are now with the RocksDBStateBackend. If the different memory pools are not clearly separated and can spill over to a different pool, then it is quite hard to understand what exactly causes a process to get killed for using too much memory. This could then easily lead to a similar situation what we have with the cutoff-ratio. So why not setting a sane default value for max direct memory and giving the user an option to increase it if he runs into an OOM. @Xintong, how would alternative 2 lead to lower memory utilization than alternative 3 where we set the direct memory to a higher value? Cheers, Till On Fri, Aug 9, 2019 at 9:12 AM Xintong Song <tonysong...@gmail.com> wrote: > Thanks for the feedback, Yang. > > Regarding your comments: > > *Native and Direct Memory* > I think setting a very large max direct memory size definitely has some > good sides. E.g., we do not worry about direct OOM, and we don't even need > to allocate managed / network memory with Unsafe.allocate() . > However, there are also some down sides of doing this. > > - One thing I can think of is that if a task executor container is > killed due to overusing memory, it could be hard for use to know which > part > of the memory is overused. > - Another down side is that the JVM never trigger GC due to reaching max > direct memory limit, because the limit is too high to be reached. That > means we kind of relay on heap memory to trigger GC and release direct > memory. That could be a problem in cases where we have more direct > memory > usage but not enough heap activity to trigger the GC. > > Maybe you can share your reasons for preferring setting a very large value, > if there are anything else I overlooked. > > *Memory Calculation* > If there is any conflict between multiple configuration that user > explicitly specified, I think we should throw an error. > I think doing checking on the client side is a good idea, so that on Yarn / > K8s we can discover the problem before submitting the Flink cluster, which > is always a good thing. > But we can not only rely on the client side checking, because for > standalone cluster TaskManagers on different machines may have different > configurations and the client does see that. > What do you think? > > Thank you~ > > Xintong Song > > > > On Thu, Aug 8, 2019 at 5:09 PM Yang Wang <danrtsey...@gmail.com> wrote: > > > Hi xintong, > > > > > > Thanks for your detailed proposal. After all the memory configuration are > > introduced, it will be more powerful to control the flink memory usage. I > > just have few questions about it. > > > > > > > > - Native and Direct Memory > > > > We do not differentiate user direct memory and native memory. They are > all > > included in task off-heap memory. Right? So i don’t think we could not > set > > the -XX:MaxDirectMemorySize properly. I prefer leaving it a very large > > value. > > > > > > > > - Memory Calculation > > > > If the sum of and fine-grained memory(network memory, managed memory, > etc.) > > is larger than total process memory, how do we deal with this situation? > Do > > we need to check the memory configuration in client? > > > > Xintong Song <tonysong...@gmail.com> 于2019年8月7日周三 下午10:14写道: > > > > > Hi everyone, > > > > > > We would like to start a discussion thread on "FLIP-49: Unified Memory > > > Configuration for TaskExecutors"[1], where we describe how to improve > > > TaskExecutor memory configurations. The FLIP document is mostly based > on > > an > > > early design "Memory Management and Configuration Reloaded"[2] by > > Stephan, > > > with updates from follow-up discussions both online and offline. > > > > > > This FLIP addresses several shortcomings of current (Flink 1.9) > > > TaskExecutor memory configuration. > > > > > > - Different configuration for Streaming and Batch. > > > - Complex and difficult configuration of RocksDB in Streaming. > > > - Complicated, uncertain and hard to understand. > > > > > > > > > Key changes to solve the problems can be summarized as follows. > > > > > > - Extend memory manager to also account for memory usage by state > > > backends. > > > - Modify how TaskExecutor memory is partitioned accounted individual > > > memory reservations and pools. > > > - Simplify memory configuration options and calculations logics. > > > > > > > > > Please find more details in the FLIP wiki document [1]. > > > > > > (Please note that the early design doc [2] is out of sync, and it is > > > appreciated to have the discussion in this mailing list thread.) > > > > > > > > > Looking forward to your feedbacks. > > > > > > > > > Thank you~ > > > > > > Xintong Song > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > > > > > > [2] > > > > > > > > > https://docs.google.com/document/d/1o4KvyyXsQMGUastfPin3ZWeUXWsJgoL7piqp1fFYJvA/edit?usp=sharing > > > > > >