Hi Vinay! If you see that the memory usage is different when you checkpoint, it can be two things:
(1) RocksDB needs to hold onto some snapshot state internally while the async background snapshot happens. That requires some memory. (2) There is often data buffered during the alignment of checkpoints, which Flink write to locak file streams. That means that the disk cache takes more memory, when memory is left. Since you are writing to S3, there may be more implications. S3 sometimes throttles connections, meaning that some nodes get less upload bandwidth than others, and their snapshot takes longer. Those nodes have to hold onto the snapshot state for longer before they can release it. A few things I would try: - Try how it works if you make checkpoints less frequent, giving the application more time between checkpoints. Once you find a stable interval, let's tune it from there (make improvements that make the interval shorter) - Incremental checkpoints is going to help a lot with making the interval shorter again, we try to get those into Flink 1.3 - You can try to optimize the program a but, make processing per record faster. That helps to faster catch up if one of the nodes becomes a straggler during checkpoints. - Common options to tune is to see if you can enable object reuse (if your program is save) and to make sure the types you store in the state serialize efficiently. Concerning the FsStateBackend: - It stores all objects on the heap, hits no disk. It works well enough if you don't do to the limit of the JVM heap (JVM performs bad if it does not have a certain amount of spare help memory during GC). - It currently snapshots synchronously, which gives a throughput hit upon checkpoints. - We have a brand new variant that does this asynchronously and thus should behave much better. We will merge that beginning of next week. That one could be worth checking out for you, I will ping you once it is available. Maybe Stefan (in cc) has an early access branch that he can share. Hope that helps! Greetings, Stephan On Thu, Feb 23, 2017 at 6:13 PM, vinay patil <vinay18.pa...@gmail.com> wrote: > Hi, > > When I disabled checkpointing the memory usage is similar for all nodes, > this means that for checkpointing enabled case the data is first flushed > to memory of CORE nodes (DataNode daemon is running here in case of EMR ) . > > I am going to run with FSStatebackend on a high end cluster with 122GB > RAM, in case of FSStatebackend does it use TM heap memory or physical > memory to store the state ? > > Regards, > Vinay Patil > > On Thu, Feb 23, 2017 at 7:50 PM, vinay patil [via Apache Flink User > Mailing List archive.] <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=11845&i=0>> wrote: > >> Hi Stephan, >> >> Anyways the Async exception is gone. >> >> I have increased my instance type to r3.2xlarge having 60GB of memory. >> BUt what I have observed here is that for two task managers the memory >> usage is close to 30GB but for other two it goes up to 55GB, the load is >> equally distributed among all TM's. >> Why does this happen ? >> >> ------------------------------ >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend- >> tp11752p11831.html >> To start a new topic under Apache Flink User Mailing List archive., email >> [hidden >> email] <http:///user/SendEmail.jtp?type=node&node=11845&i=1> >> To unsubscribe from Apache Flink User Mailing List archive., click here. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > ------------------------------ > View this message in context: Re: Checkpointing with RocksDB as > statebackend > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11845.html> > > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >