Hi Ashish, great to hear that things work better with the RocksDB state backend. I would only start playing with the containerized.heap-cutoff-ratio if you see TMs failing due to exceeding the direct memory limit. Currently, not all of the cutoff memory is set as the direct memory limit. We have a pending fix for that.
Apart from that, it is indeed a good idea to test your application and monitor how it behaves when increasing the workload. Cheers, Till On Mon, Oct 30, 2017 at 1:34 AM, ashish pok <ashish...@yahoo.com> wrote: > Jorn, correct and I suppose that's where we are at this point. RocksDB > based backend is definitely looking promising for our use case. Since I > haven't gotten a definite no-no on using 30% for YARN cut-off ratio (about > 1.8GB from 6GB memory) and off-heap flag turned on, we will continue on > that path. Current plan is to increase throughput on input streams - state > streams are pretty much processing already and preserved in RocksDB and we > can control streams for joining with those states and monitor resource > utilizations + join performance. We are seeing 200-500ms processing times > with pretty decent amount of logging, which is pretty good for our needs. > > Agree about the way to estimate the size of state and hence one of the > reasons of my original question on what others have done. Our states are > essentially tuples (few primitive values like string, long and a Map of > string and string, which hold about 10-12 keys, values are small - not more > than 128 bytes tops). We created a savepoint after processing about 500k > records and that's where my estimate came from. I'd be the first one to > admit it is not accurate but that's the best we could think of. > > Thanks, Ashish > > ------------------------------ > *From:* Jörn Franke <jornfra...@gmail.com> > *To:* Ashish Pokharel <ashish...@yahoo.com> > *Cc:* Till Rohrmann <trohrm...@apache.org>; user <user@flink.apache.org> > *Sent:* Sunday, October 29, 2017 6:05 PM > *Subject:* Re: Capacity Planning For Large State in YARN Cluster > > Well you can only performance test it beforehand in different scenarios > with different configurations. > > I am not sure what exactly your state holds (eg how many objects etc), but > if it is Java objects then 3 times might be a little bit low (depends also > how you initially tested state size) - however Flink optimizes this as > well. Nevertheless, something like Rocksdb is probably a better solution > for larger states. > > On 29. Oct 2017, at 21:15, Ashish Pokharel <ashish...@yahoo.com> wrote: > > Hi Till, > > I got the same feedback from Robert Metzger over in Stackflow. I have > switched my app to use RocksDB and as yes, it did stabilize the app :) > > However, I am still struggling with how to map out my TMs and JMs memory, > number of slots per TMs etc. Currently I am using 60 slots with 10 TMs and > 60 GB of total cluster memory. Idea was to make the states distributed and > approx. 1 GB of memory per slot. I have also changed containerized.heap- > cutoff-ratio config to 0.3 to allow for a little room for RocksDB > (RocksDB is using basic spinning disk optimized pre-defined configs but we > do have SSDs on our Prod machines that we can leverage in future too) and > set taskmanager.memory.off-heap to true.It feels more experimental at > this point than an exact science :) If there are any further guidelines on > how we can plan for this as we open up the flood gates to stream heavy > continuous streams, that will be great. > > Thanks again, > > Ashish > > On Oct 27, 2017, at 8:45 AM, Till Rohrmann <trohrm...@apache.org> wrote: > > Hi Ashish, > > what you are describing should be a good use case for Flink and it should > be able to run your program. > > When you are seeing a GC overhead limit exceeded error, then it means that > Flink or your program are creating too many/too large objects filling up > the memory in a short time. I would recommend checking your user program to > see whether you can avoid unnecessary object instantiations and whether it > is possible to reuse created objects. > > Concerning Flink's state backends, the memory state backend is currently > not able to spill to disk. Also the managed memory is only relevant for > DataSet/batch programs and not streaming programs. Therefore, I would > recommend you to try out the RocksDB state backend which is able to > gracefully spill to disk if the state size should grow too large. > Consequently, you don't have to adjust the managed memory settings because > they currently don't have an effect on streaming programs. > > My gut feeling is that switching to the RocksDBStateBackend could already > solve your problems. If this should not be the case, then please let me > know again. > > Cheers, > Till > > On Fri, Oct 27, 2017 at 5:27 AM, Ashish Pokharel <ashish...@yahoo.com> > wrote: > > Hi Everyone, > > We have hit a roadblock moving an app at Production scale and was hoping > to get some guidance. Application is pretty common use case in stream > processing but does require maintaining large number of keyed states. We > are processing 2 streams - one of which is a daily burst of stream > (normally around 50 mil but could go upto 100 mil in one hour burst) and > other is constant stream of around 70-80 mil per hour. We are doing a low > level join using CoProcess function between the two keyed streams. > CoProcess function needs to refresh (upsert) state from the daily burst > stream and decorate constantly streaming data with values from state built > using bursty stream. All of the logic is working pretty well in a > standalone Dev environment. We are throwing about 500k events of bursty > traffic for state and about 2-3 mil of data stream. We have 1 TM with 16GB > memory, 1 JM with 8 GB memory and 16 slots (1 per core on the server) on > the server. We have been taking savepoints in case we need to restart app > for with code changes etc. App does seem to recover from state very well as > well. Based on the savepoints, total volume of state in production flow > should be around 25-30GB. > > At this point, however, we are trying deploy the app at production scale. > App also has a flag that can be set at startup time to ignore data stream > so we can simply initialize state. So basically we are trying to see if we > can initialize the state first and take a savepoint as test. At this point > we are using 10 TM with 4 slots and 8GB memory each (idea was to allocate > around 3 times estimated state size to start with) but TMs keep getting > killed by YARN with a GC Overhead Limit Exceeded error. We have gone > through quite a few blogs/docs on Flink Management Memory, off-heap vs heap > memory, Disk Spill over, State Backend etc. We did try to tweak > managed-memory configs in multiple ways (off/on heap, fraction, network > buffers etc) but can’t seem to figure out good way to fine tune the app to > avoid issues. Ideally, we would hold state in memory (we do have enough > capacity in Production environment for it) for performance reasons and > spill over to disk (which I believe Flink should provide out of the box?). > It feels like 3x anticipated state volume in cluster memory should have > been enough to just initialize state. So instead of just continuing to > increase memory (which may or may not help as error is regarding GC > overhead) we wanted to get some input from experts on best practices and > approach to plan this application better. > > Appreciate your input in advance! > > > > > >