Hi Vinay!

If you see that the memory usage is different when you checkpoint, it can
be two things:

(1) RocksDB needs to hold onto some snapshot state internally while the
async background snapshot happens. That requires some memory.
(2) There is often data buffered during the alignment of checkpoints, which
Flink write to locak file streams. That means that the disk cache takes
more memory, when memory is left.

Since you are writing to S3, there may be more implications. S3 sometimes
throttles connections, meaning that some nodes get less upload bandwidth
than others, and their snapshot takes longer.
Those nodes have to hold onto the snapshot state for longer before they can
release it.


A few things I would try:

  - Try how it works if you make checkpoints less frequent, giving the
application more time between checkpoints. Once you find a stable interval,
let's tune it from there (make improvements that make the interval shorter)
  - Incremental checkpoints is going to help a lot with making the interval
shorter again, we try to get those into Flink 1.3

  - You can try to optimize the program a but, make processing per record
faster. That helps to faster catch up if one of the nodes becomes a
straggler during checkpoints.
  - Common options to tune is to see if you can enable object reuse (if
your program is save) and to make sure the types you store in the state
serialize efficiently.


Concerning the FsStateBackend:

  - It stores all objects on the heap, hits no disk. It works well enough
if you don't do to the limit of the JVM heap (JVM performs bad if it does
not have a certain amount of spare help memory during GC).

  - It currently snapshots synchronously, which gives a throughput hit upon
checkpoints.

  - We have a brand new variant that does this asynchronously and thus
should behave much better. We will merge that beginning of next week. That
one could be worth checking out for you, I will ping you once it is
available. Maybe Stefan (in cc) has an early access branch that he can
share.


Hope that helps!


Greetings,
Stephan



On Thu, Feb 23, 2017 at 6:13 PM, vinay patil <vinay18.pa...@gmail.com>
wrote:

> Hi,
>
> When I disabled checkpointing the memory usage is similar for all nodes,
> this means that for  checkpointing enabled case  the data is first flushed
> to memory of CORE nodes (DataNode daemon is running here in case of EMR ) .
>
> I am going to run with FSStatebackend on a high end cluster with 122GB
> RAM, in case of FSStatebackend does it use TM heap memory or physical
> memory to store the state ?
>
> Regards,
> Vinay Patil
>
> On Thu, Feb 23, 2017 at 7:50 PM, vinay patil [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=11845&i=0>> wrote:
>
>> Hi Stephan,
>>
>> Anyways the Async exception is gone.
>>
>> I have increased my instance type to r3.2xlarge having 60GB of memory.
>> BUt what I have observed here is that for two task managers the memory
>> usage is close to 30GB but for other two it goes up to 55GB, the load is
>> equally distributed among all TM's.
>> Why does this happen ?
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-
>> tp11752p11831.html
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=11845&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Checkpointing with RocksDB as
> statebackend
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11845.html>
>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Reply via email to