Hi, I see that matching the RocksDB configuration to fit certain container sizes can be very tedious and error prone for users. I have opened a jira to start improving the situation: https://issues.apache.org/jira/browse/FLINK-7289 <https://issues.apache.org/jira/browse/FLINK-7289>. Please feel free to comment and share your experiences or ideas, they might be very valuable input.
One consideration, from what you shared I can see that you are using 8 slots per task and a heap size of 35840MB. This means that there are potentially also up to 8 RocksDB instances on one TM. Furthermore, when you are using RocksDB, your heavy state will typically live in RocksDB (native memory) and no longer on the JVM heap. I think it would make a lot of sense to reduce you maximum heap size dramatically, so that more memory from your container budget is available as native memory for RocksDB. I hope this can also help with your problem. Best, Stefan > Am 27.07.2017 um 10:49 schrieb Shashwat Rastogi > <shashwat.rast...@reflektion.com>: > > Hi Kien, > > Sorry it took me sometime to fetch the logs. I am attaching logs of the > machine which died due to lack of free memory. > > <c01-log-1300to1430.txt> > > I have set only > `taskmanager.heap.mb: 35840` > taskmanager.numberOfTaskSlots: 8 > And the rest are just default properties in the flink-conf.yaml > > Thank you in advance. > > Regards > Shashwat > >> On 26-Jul-2017, at 12:10 PM, Kien Truong <duckientru...@gmail.com >> <mailto:duckientru...@gmail.com>> wrote: >> >> Hi, >> >> What're your task manager memory configuration ? Can you post the >> TaskManager's log ? >> >> Regards, >> >> Kien >> >> >> On 7/25/2017 8:41 PM, Shashwat Rastogi wrote: >>> Hi, >>> >>> We have several Flink jobs, all of which reads data from Kafka do some >>> aggregations (over sliding windows of (1d, 1h)) and writes data to >>> Cassandra. Something like : >>> >>> ``` >>> DataStream<String> lines = env.addSource(new FlinkKafkaConsumer010( … )); >>> DataStream<Event> events = lines.map(line -> parse(line)); >>> DataStream<Statistics> stats = stream >>> .keyBy(“id”) >>> .timeWindow(1d, 1h) >>> .sum(new MyAggregateFunction()); >>> writeToCassandra(stats); >>> ``` >>> >>> We recently made a switch to RocksDbStateBackend, for it’s suitability for >>> large states/long windows. However, after making the switch a memory issues >>> has come up, the memory utilisation on TaskManager gradually increases from >>> 50 GB to ~63GB until the container is killed. We are unable to figure out >>> what is causing this behaviour, is there some memory leak on the RocksDB ? >>> >>> How much memory should we allocate to the Flink TaskManager? Since, RocksDB >>> is a native application and it does not use the JVM how much of the memory >>> should we allocate/leave for RocksDB (out of 64GB of total memory). >>> Is there a way to set the maximum amount of memory that will be used by >>> RocksDB so that it doesn’t overwhelms the system? Are there some >>> recommended optimal settings for RocksDB for larger states (for 1 day >>> window average state size is 3GB). >>> >>> Any help would be greatly appreciated. I am using Flink v1.2.1. >>> Thanks in advance. >>> >>> Best, >>> Shashwat >> >