Hi,

I see that matching the RocksDB configuration to fit certain container sizes 
can be very tedious and error prone for users. I have opened a jira to start 
improving the situation: https://issues.apache.org/jira/browse/FLINK-7289 
<https://issues.apache.org/jira/browse/FLINK-7289>. Please feel free to comment 
and share your experiences or ideas, they might be very valuable input.

One consideration, from what you shared I can see that you are using 8 slots 
per task and a heap size of 35840MB. This means that there are potentially also 
up to 8 RocksDB instances on one TM. Furthermore, when you are using RocksDB, 
your heavy state will typically live in RocksDB (native memory) and no longer 
on the JVM heap. I think it would make a lot of sense to reduce you maximum 
heap size dramatically, so that more memory from your container budget is 
available as native memory for RocksDB. I hope this can also help with your 
problem.

Best,
Stefan

> Am 27.07.2017 um 10:49 schrieb Shashwat Rastogi 
> <shashwat.rast...@reflektion.com>:
> 
> Hi Kien,
> 
> Sorry it took me sometime to fetch the logs. I am attaching logs of the 
> machine which died due to lack of free memory. 
> 
> <c01-log-1300to1430.txt>
> 
> I have set only
> `taskmanager.heap.mb: 35840`
> taskmanager.numberOfTaskSlots: 8
> And the rest are just default properties in the flink-conf.yaml
> 
> Thank you in advance.
> 
> Regards
> Shashwat
> 
>> On 26-Jul-2017, at 12:10 PM, Kien Truong <duckientru...@gmail.com 
>> <mailto:duckientru...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> What're your task manager memory configuration ? Can you post the 
>> TaskManager's log ?
>> 
>> Regards,
>> 
>> Kien
>> 
>> 
>> On 7/25/2017 8:41 PM, Shashwat Rastogi wrote:
>>> Hi,
>>> 
>>> We have several Flink jobs, all of which reads data from Kafka do some 
>>> aggregations (over sliding windows of (1d, 1h)) and writes data to 
>>> Cassandra. Something like :
>>> 
>>> ```
>>> DataStream<String> lines = env.addSource(new FlinkKafkaConsumer010( … ));
>>> DataStream<Event> events = lines.map(line -> parse(line));
>>> DataStream<Statistics> stats = stream
>>>     .keyBy(“id”)
>>>     .timeWindow(1d, 1h)
>>>     .sum(new MyAggregateFunction());
>>> writeToCassandra(stats);
>>> ```
>>> 
>>> We recently made a switch to RocksDbStateBackend, for it’s suitability for 
>>> large states/long windows. However, after making the switch a memory issues 
>>> has come up, the memory utilisation on TaskManager gradually increases from 
>>> 50 GB to ~63GB until the container is killed. We are unable to figure out 
>>> what is causing this behaviour, is there some memory leak on the RocksDB ?
>>> 
>>> How much memory should we allocate to the Flink TaskManager? Since, RocksDB 
>>> is a native application and it does not use the JVM how much of the memory 
>>> should we allocate/leave for RocksDB (out of 64GB of total memory).
>>> Is there a way to set the maximum amount of memory that will be used by 
>>> RocksDB so that it doesn’t overwhelms the system? Are there some 
>>> recommended optimal settings for RocksDB for larger states (for 1 day 
>>> window average state size is 3GB).
>>> 
>>> Any help would be greatly appreciated. I am using Flink v1.2.1.
>>> Thanks in advance.
>>> 
>>> Best,
>>> Shashwat
>> 
> 

Reply via email to