Just wanted to throw in my 2cts.

I’ve been running pipelines with similar state size using rocksdb which 
externalize to S3 and bucket to S3. I was getting stalls like this and ended up 
tracing the problem to S3 and the bucketing sink. The solution was two fold:


1)       I forked hadoop-aws and have it treat flink as a source of truth. Emr 
uses a dynamodb table to determine if S3 is inconsistent. Instead I say that if 
flink believes that a file exists on S3 and we don’t see it then I am going to 
trust that flink is in a consistent state and S3 is not. In this case, various 
operations will perform a back off and retry up to a certain number of times.


2)       The bucketing sink performs multiple renames over the lifetime of a 
file, occurring when a checkpoint starts and then again on notification after 
it completes. Due to S3’s consistency guarantees the second rename of file can 
never be assured to work and will eventually fail either during or after a 
checkpoint. Because there is no upper bound on the time it will take for a file 
on S3 to become consistent, retries cannot solve this specific problem as it 
could take upwards of many minutes to rename which would stall the entire 
pipeline. The only viable solution I could find was to write a custom sink 
which understands S3. Each writer will write file locally and then copy it to 
S3 on checkpoint. By only interacting with S3 once per file it can circumvent 
consistency issues all together.

Hope this helps,

Seth Wiesman

From: vinay patil <vinay18.pa...@gmail.com>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Saturday, February 25, 2017 at 10:50 AM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Checkpointing with RocksDB as statebackend

HI Stephan,
Just to avoid the confusion here, I am using S3 sink for writing the data, and 
using HDFS for storing checkpoints.
There are 2 core nodes (HDFS) and two task nodes on EMR

I replaced s3 sink with HDFS for writing data in my last test.
Let's say the checkpoint interval is 5 minutes, now within 5minutes of run the 
state size grows to 30GB ,  after checkpointing the 30GB state that is 
maintained in rocksDB has to be copied to HDFS, right ?  is this causing the 
pipeline to stall ?

Regards,
Vinay Patil

On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden 
email]<file:////user/SendEmail.jtp%3ftype=node&node=11913&i=0>> wrote:
Hi Stephan,
To verify if S3 is making teh pipeline stall, I have replaced the S3 sink with 
HDFS and kept minimum pause between checkpoints to 5minutes, still I see the 
same issue with checkpoints getting failed.
If I keep the  pause time to 20 seconds, all checkpoints are completed , 
however there is a hit in overall throughput.



Regards,
Vinay Patil

On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User Mailing 
List archive.] <[hidden 
email]<file:////user/SendEmail.jtp%3ftype=node&node=11913&i=1>> wrote:
Flink's state backends currently do a good number of "make sure this exists" 
operations on the file systems. Through Hadoop's S3 filesystem, that translates 
to S3 bucket list operations, where there is a limit in how many operation may 
happen per time interval. After that, S3 blocks.

It seems that operations that are totally cheap on HDFS are hellishly expensive 
(and limited) on S3. It may be that you are affected by that.

We are gradually trying to improve the behavior there and be more S3 aware.

Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there.

Best,
Stephan


On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden 
email]<http://user/SendEmail.jtp?type=node&node=11891&i=0>> wrote:

Hi Stephan,

So do you mean that S3 is causing the stall , as I have mentioned in my 
previous mail, I could not see any progress for 16minutes as checkpoints were 
getting failed continuously.

On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing List 
archive.]" <[hidden email]<http://user/SendEmail.jtp?type=node&node=11887&i=0>> 
wrote:
Hi Vinay!

True, the operator state (like Kafka) is currently not asynchronously 
checkpointed.

While it is rather small state, we have seen before that on S3 it can cause 
trouble, because S3 frequently stalls uploads of even data amounts as low as 
kilobytes due to its throttling policies.

That would be a super important fix to add!

Best,
Stephan


On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden 
email]<http://user/SendEmail.jtp?type=node&node=11885&i=0>> wrote:
Hi,

I have attached a snapshot for reference:
As you can see all the 3 checkpointins failed , for checkpoint ID 2 and 3 it
is stuck at the Kafka source after 50%
(The data sent till now by Kafka source 1 is 65GB and sent by source 2 is
15GB )

Within 10minutes 15M records were processed, and for the next 16minutes the
pipeline is stuck , I don't see any progress beyond 15M because of
checkpoints getting failed consistently.

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11882/Checkpointing_Failed.png>



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11882.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11885.html
To start a new topic under Apache Flink User Mailing List archive., email 
[hidden email]<http://user/SendEmail.jtp?type=node&node=11887&i=1>
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>

________________________________
View this message in context: Re: Checkpointing with RocksDB as 
statebackend<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html>
Sent from the Apache Flink User Mailing List archive. mailing list 
archive<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> 
at Nabble.com.


________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11891.html
To start a new topic under Apache Flink User Mailing List archive., email 
[hidden email]<file:////user/SendEmail.jtp%3ftype=node&node=11913&i=2>
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>



________________________________
View this message in context: Re: Checkpointing with RocksDB as 
statebackend<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11913.html>
Sent from the Apache Flink User Mailing List archive. mailing list 
archive<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> 
at Nabble.com.

Reply via email to