Hi Seth, Thank you for your suggestion.
But if the issue is only related to S3, then why does this happen when I replace the S3 sink to HDFS as well (for checkpointing I am using HDFS only ) Stephan, Another issue I see is when I set env.setBufferTimeout(-1) , and keep the checkpoint interval to 10minutes, I have observed that nothing gets written to sink (tried with S3 as well as HDFS), atleast I was expecting pending files here. This issue gets worst when checkpointing is disabled as nothing is written. Regards, Vinay Patil On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User Mailing List archive.] <ml-node+s2336050n11943...@n4.nabble.com> wrote: > Hi Seth! > > Wow, that is an awesome approach. > > We have actually seen these issues as well and we are looking to > eventually implement our own S3 file system (and circumvent Hadoop's S3 > connector that Flink currently relies on): https://issues.apache. > org/jira/browse/FLINK-5706 > > Do you think your patch would be a good starting point for that and would > you be willing to share it? > > The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible to > fork officially, if necessary... > > Greetings, > Stephan > > > > On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=11943&i=0>> wrote: > >> Just wanted to throw in my 2cts. >> >> >> >> I’ve been running pipelines with similar state size using rocksdb which >> externalize to S3 and bucket to S3. I was getting stalls like this and >> ended up tracing the problem to S3 and the bucketing sink. The solution was >> two fold: >> >> >> >> 1) I forked hadoop-aws and have it treat flink as a source of >> truth. Emr uses a dynamodb table to determine if S3 is inconsistent. >> Instead I say that if flink believes that a file exists on S3 and we don’t >> see it then I am going to trust that flink is in a consistent state and S3 >> is not. In this case, various operations will perform a back off and retry >> up to a certain number of times. >> >> >> >> 2) The bucketing sink performs multiple renames over the lifetime >> of a file, occurring when a checkpoint starts and then again on >> notification after it completes. Due to S3’s consistency guarantees the >> second rename of file can never be assured to work and will eventually fail >> either during or after a checkpoint. Because there is no upper bound on the >> time it will take for a file on S3 to become consistent, retries cannot >> solve this specific problem as it could take upwards of many minutes to >> rename which would stall the entire pipeline. The only viable solution I >> could find was to write a custom sink which understands S3. Each writer >> will write file locally and then copy it to S3 on checkpoint. By only >> interacting with S3 once per file it can circumvent consistency issues all >> together. >> >> >> >> Hope this helps, >> >> >> >> Seth Wiesman >> >> >> >> *From: *vinay patil <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=11943&i=1>> >> *Reply-To: *"[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=11943&i=2>" <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=11943&i=3>> >> *Date: *Saturday, February 25, 2017 at 10:50 AM >> *To: *"[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=11943&i=4>" <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=11943&i=5>> >> *Subject: *Re: Checkpointing with RocksDB as statebackend >> >> >> >> HI Stephan, >> >> Just to avoid the confusion here, I am using S3 sink for writing the >> data, and using HDFS for storing checkpoints. >> >> There are 2 core nodes (HDFS) and two task nodes on EMR >> >> >> I replaced s3 sink with HDFS for writing data in my last test. >> >> Let's say the checkpoint interval is 5 minutes, now within 5minutes of >> run the state size grows to 30GB , after checkpointing the 30GB state that >> is maintained in rocksDB has to be copied to HDFS, right ? is this causing >> the pipeline to stall ? >> >> >> Regards, >> >> Vinay Patil >> >> >> >> On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote: >> >> Hi Stephan, >> >> To verify if S3 is making teh pipeline stall, I have replaced the S3 sink >> with HDFS and kept minimum pause between checkpoints to 5minutes, still I >> see the same issue with checkpoints getting failed. >> >> If I keep the pause time to 20 seconds, all checkpoints are completed , >> however there is a hit in overall throughput. >> >> >> >> >> Regards, >> >> Vinay Patil >> >> >> >> On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User >> Mailing List archive.] <[hidden email]> wrote: >> >> Flink's state backends currently do a good number of "make sure this >> exists" operations on the file systems. Through Hadoop's S3 filesystem, >> that translates to S3 bucket list operations, where there is a limit in how >> many operation may happen per time interval. After that, S3 blocks. >> >> >> >> It seems that operations that are totally cheap on HDFS are hellishly >> expensive (and limited) on S3. It may be that you are affected by that. >> >> >> >> We are gradually trying to improve the behavior there and be more S3 >> aware. >> >> >> >> Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there. >> >> >> >> Best, >> >> Stephan >> >> >> >> >> >> On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden email] >> <http://user/SendEmail.jtp?type=node&node=11891&i=0>> wrote: >> >> Hi Stephan, >> >> So do you mean that S3 is causing the stall , as I have mentioned in my >> previous mail, I could not see any progress for 16minutes as checkpoints >> were getting failed continuously. >> >> >> >> On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing >> List archive.]" <[hidden email] >> <http://user/SendEmail.jtp?type=node&node=11887&i=0>> wrote: >> >> Hi Vinay! >> >> >> >> True, the operator state (like Kafka) is currently not asynchronously >> checkpointed. >> >> >> >> While it is rather small state, we have seen before that on S3 it can >> cause trouble, because S3 frequently stalls uploads of even data amounts as >> low as kilobytes due to its throttling policies. >> >> >> >> That would be a super important fix to add! >> >> >> >> Best, >> >> Stephan >> >> >> >> >> >> On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email] >> <http://user/SendEmail.jtp?type=node&node=11885&i=0>> wrote: >> >> Hi, >> >> I have attached a snapshot for reference: >> As you can see all the 3 checkpointins failed , for checkpoint ID 2 and 3 >> it >> is stuck at the Kafka source after 50% >> (The data sent till now by Kafka source 1 is 65GB and sent by source 2 is >> 15GB ) >> >> Within 10minutes 15M records were processed, and for the next 16minutes >> the >> pipeline is stuck , I don't see any progress beyond 15M because of >> checkpoints getting failed consistently. >> >> <http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/file/n11882/Checkpointing_Failed.png> >> >> >> >> -- >> View this message in context: http://apache-flink-user-maili >> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing- >> with-RocksDB-as-statebackend-tp11752p11882.html >> >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> >> >> >> >> ------------------------------ >> >> *If you reply to this email, your message will be added to the discussion >> below:* >> >> http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend- >> tp11752p11885.html >> >> To start a new topic under Apache Flink User Mailing List archive., email >> [hidden >> email] <http://user/SendEmail.jtp?type=node&node=11887&i=1> >> To unsubscribe from Apache Flink User Mailing List archive., click here. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> >> >> ------------------------------ >> >> View this message in context: Re: Checkpointing with RocksDB as >> statebackend >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html> >> >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >> at Nabble.com. >> >> >> >> >> ------------------------------ >> >> *If you reply to this email, your message will be added to the discussion >> below:* >> >> http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend- >> tp11752p11891.html >> >> To start a new topic under Apache Flink User Mailing List archive., email >> [hidden >> email] >> To unsubscribe from Apache Flink User Mailing List archive., click here. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> >> >> >> >> >> >> ------------------------------ >> >> View this message in context: Re: Checkpointing with RocksDB as >> statebackend >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11913.html> >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >> at Nabble.com. >> >> > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re- > Checkpointing-with-RocksDB-as-statebackend-tp11752p11943.html > To start a new topic under Apache Flink User Mailing List archive., email > ml-node+s2336050n1...@n4.nabble.com > To unsubscribe from Apache Flink User Mailing List archive., click here > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx> > . > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11948.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.