Hi Vinay! Savepoints also call the same problematic RocksDB function, unfortunately.
We will have a fix next month. We either (1) get a patched RocksDB version or we (2) implement a different pattern for ListState in Flink. (1) would be the better solution, so we are waiting for a response from the RocksDB folks. (2) is always possible if we cannot get a fix from RocksDB. Stephan On Wed, Mar 15, 2017 at 5:53 PM, vinay patil <vinay18.pa...@gmail.com> wrote: > Hi Stephan, > > Thank you for making me aware of this. > > Yes I am using a window without reduce function (Apply function). The > discussion happening on JIRA is exactly what I am observing, consistent > failure of checkpoints after some time and the stream halts. > > We want to go live in next month, not sure how this will affect in > production as we are going to get above 200 million data. > > As a workaround can I take the savepoint while the pipeline is running ? > Let's say if I take savepoint after every 30minutes, will it work ? > > > > Regards, > Vinay Patil > > On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User > Mailing List archive.] <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=12224&i=0>> wrote: > >> The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756 >> >> On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=12209&i=0>> wrote: >> >>> Hi Vinay, >>> >>> I think the issue is tracked here: https://github.com/faceb >>> ook/rocksdb/issues/1988. >>> >>> Best, >>> Stefan >>> >>> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=12209&i=1>>: >>> >>> Hi Stephan, >>> >>> Is there a ticket number/link to track this, My job has all the >>> conditions you mentioned. >>> >>> Thanks, >>> Vishnu >>> >>> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=12209&i=2>> wrote: >>> >>>> Hi Vinay! >>>> >>>> We just discovered a bug in RocksDB. The bug affects windows without >>>> reduce() or fold(), windows with evictors, and ListState. >>>> >>>> A certain access pattern in RocksDB starts being so slow after a >>>> certain size-per-key that it basically brings down the streaming program >>>> and the snapshots. >>>> >>>> We are reaching out to the RocksDB folks and looking for workarounds in >>>> Flink. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <[hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=3>> wrote: >>>> >>>>> @vinay Can you try to not set the buffer timeout at all? I am >>>>> actually not sure what would be the effect of setting it to a negative >>>>> value, that can be a cause of problems... >>>>> >>>>> >>>>> On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <[hidden email] >>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=4>> wrote: >>>>> >>>>>> Vinay, >>>>>> >>>>>> >>>>>> >>>>>> The bucketing sink performs rename operations during the checkpoint >>>>>> and if it tries to rename a file that is not yet consistent that would >>>>>> cause a FileNotFound exception which would fail the checkpoint. >>>>>> >>>>>> >>>>>> >>>>>> Stephan, >>>>>> >>>>>> >>>>>> >>>>>> Currently my aws fork contains some very specific assumptions about >>>>>> the pipeline that will in general only hold for my pipeline. This is >>>>>> because there were still some open questions that I had about how to >>>>>> solve >>>>>> consistency issues in the general case. I will comment on the Jira issue >>>>>> with more specific. >>>>>> >>>>>> >>>>>> >>>>>> Seth Wiesman >>>>>> >>>>>> >>>>>> >>>>>> *From: *vinay patil <[hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=5>> >>>>>> *Reply-To: *"[hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=6>" <[hidden >>>>>> email] <http:///user/SendEmail.jtp?type=node&node=12209&i=7>> >>>>>> *Date: *Monday, February 27, 2017 at 1:05 PM >>>>>> *To: *"[hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=8>" <[hidden >>>>>> email] <http:///user/SendEmail.jtp?type=node&node=12209&i=9>> >>>>>> >>>>>> >>>>>> *Subject: *Re: Checkpointing with RocksDB as statebackend >>>>>> >>>>>> >>>>>> >>>>>> Hi Seth, >>>>>> >>>>>> Thank you for your suggestion. >>>>>> >>>>>> But if the issue is only related to S3, then why does this happen >>>>>> when I replace the S3 sink to HDFS as well (for checkpointing I am using >>>>>> HDFS only ) >>>>>> >>>>>> Stephan, >>>>>> >>>>>> Another issue I see is when I set env.setBufferTimeout(-1) , and keep >>>>>> the checkpoint interval to 10minutes, I have observed that nothing gets >>>>>> written to sink (tried with S3 as well as HDFS), atleast I was expecting >>>>>> pending files here. >>>>>> >>>>>> This issue gets worst when checkpointing is disabled as nothing is >>>>>> written. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Regards, >>>>>> >>>>>> Vinay Patil >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User >>>>>> Mailing List archive.] <[hidden email]> wrote: >>>>>> >>>>>> Hi Seth! >>>>>> >>>>>> >>>>>> >>>>>> Wow, that is an awesome approach. >>>>>> >>>>>> >>>>>> >>>>>> We have actually seen these issues as well and we are looking to >>>>>> eventually implement our own S3 file system (and circumvent Hadoop's S3 >>>>>> connector that Flink currently relies on): https://issues.apache.org >>>>>> /jira/browse/FLINK-5706 >>>>>> >>>>>> >>>>>> >>>>>> Do you think your patch would be a good starting point for that and >>>>>> would you be willing to share it? >>>>>> >>>>>> >>>>>> >>>>>> The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible >>>>>> to fork officially, if necessary... >>>>>> >>>>>> >>>>>> >>>>>> Greetings, >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=0>> wrote: >>>>>> >>>>>> Just wanted to throw in my 2cts. >>>>>> >>>>>> >>>>>> >>>>>> I’ve been running pipelines with similar state size using rocksdb >>>>>> which externalize to S3 and bucket to S3. I was getting stalls like this >>>>>> and ended up tracing the problem to S3 and the bucketing sink. The >>>>>> solution >>>>>> was two fold: >>>>>> >>>>>> >>>>>> >>>>>> 1) I forked hadoop-aws and have it treat flink as a source of >>>>>> truth. Emr uses a dynamodb table to determine if S3 is inconsistent. >>>>>> Instead I say that if flink believes that a file exists on S3 and we >>>>>> don’t >>>>>> see it then I am going to trust that flink is in a consistent state and >>>>>> S3 >>>>>> is not. In this case, various operations will perform a back off and >>>>>> retry >>>>>> up to a certain number of times. >>>>>> >>>>>> >>>>>> >>>>>> 2) The bucketing sink performs multiple renames over the >>>>>> lifetime of a file, occurring when a checkpoint starts and then again on >>>>>> notification after it completes. Due to S3’s consistency guarantees the >>>>>> second rename of file can never be assured to work and will eventually >>>>>> fail >>>>>> either during or after a checkpoint. Because there is no upper bound on >>>>>> the >>>>>> time it will take for a file on S3 to become consistent, retries cannot >>>>>> solve this specific problem as it could take upwards of many minutes to >>>>>> rename which would stall the entire pipeline. The only viable solution I >>>>>> could find was to write a custom sink which understands S3. Each writer >>>>>> will write file locally and then copy it to S3 on checkpoint. By only >>>>>> interacting with S3 once per file it can circumvent consistency issues >>>>>> all >>>>>> together. >>>>>> >>>>>> >>>>>> >>>>>> Hope this helps, >>>>>> >>>>>> >>>>>> >>>>>> Seth Wiesman >>>>>> >>>>>> >>>>>> >>>>>> *From: *vinay patil <[hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=1>> >>>>>> *Reply-To: *"[hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=2>" <[hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=3>> >>>>>> *Date: *Saturday, February 25, 2017 at 10:50 AM >>>>>> *To: *"[hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=4>" <[hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=5>> >>>>>> *Subject: *Re: Checkpointing with RocksDB as statebackend >>>>>> >>>>>> >>>>>> >>>>>> HI Stephan, >>>>>> >>>>>> Just to avoid the confusion here, I am using S3 sink for writing the >>>>>> data, and using HDFS for storing checkpoints. >>>>>> >>>>>> There are 2 core nodes (HDFS) and two task nodes on EMR >>>>>> >>>>>> >>>>>> I replaced s3 sink with HDFS for writing data in my last test. >>>>>> >>>>>> Let's say the checkpoint interval is 5 minutes, now within 5minutes >>>>>> of run the state size grows to 30GB , after checkpointing the 30GB state >>>>>> that is maintained in rocksDB has to be copied to HDFS, right ? is this >>>>>> causing the pipeline to stall ? >>>>>> >>>>>> >>>>>> Regards, >>>>>> >>>>>> Vinay Patil >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote: >>>>>> >>>>>> Hi Stephan, >>>>>> >>>>>> To verify if S3 is making teh pipeline stall, I have replaced the S3 >>>>>> sink with HDFS and kept minimum pause between checkpoints to 5minutes, >>>>>> still I see the same issue with checkpoints getting failed. >>>>>> >>>>>> If I keep the pause time to 20 seconds, all checkpoints are >>>>>> completed , however there is a hit in overall throughput. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Regards, >>>>>> >>>>>> Vinay Patil >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User >>>>>> Mailing List archive.] <[hidden email]> wrote: >>>>>> >>>>>> Flink's state backends currently do a good number of "make sure this >>>>>> exists" operations on the file systems. Through Hadoop's S3 filesystem, >>>>>> that translates to S3 bucket list operations, where there is a limit in >>>>>> how >>>>>> many operation may happen per time interval. After that, S3 blocks. >>>>>> >>>>>> >>>>>> >>>>>> It seems that operations that are totally cheap on HDFS are hellishly >>>>>> expensive (and limited) on S3. It may be that you are affected by that. >>>>>> >>>>>> >>>>>> >>>>>> We are gradually trying to improve the behavior there and be more S3 >>>>>> aware. >>>>>> >>>>>> >>>>>> >>>>>> Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there. >>>>>> >>>>>> >>>>>> >>>>>> Best, >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=11891&i=0>> wrote: >>>>>> >>>>>> Hi Stephan, >>>>>> >>>>>> So do you mean that S3 is causing the stall , as I have mentioned in >>>>>> my previous mail, I could not see any progress for 16minutes as >>>>>> checkpoints >>>>>> were getting failed continuously. >>>>>> >>>>>> >>>>>> >>>>>> On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing >>>>>> List archive.]" <[hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=11887&i=0>> wrote: >>>>>> >>>>>> Hi Vinay! >>>>>> >>>>>> >>>>>> >>>>>> True, the operator state (like Kafka) is currently not asynchronously >>>>>> checkpointed. >>>>>> >>>>>> >>>>>> >>>>>> While it is rather small state, we have seen before that on S3 it can >>>>>> cause trouble, because S3 frequently stalls uploads of even data amounts >>>>>> as >>>>>> low as kilobytes due to its throttling policies. >>>>>> >>>>>> >>>>>> >>>>>> That would be a super important fix to add! >>>>>> >>>>>> >>>>>> >>>>>> Best, >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=11885&i=0>> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I have attached a snapshot for reference: >>>>>> As you can see all the 3 checkpointins failed , for checkpoint ID 2 >>>>>> and 3 it >>>>>> is stuck at the Kafka source after 50% >>>>>> (The data sent till now by Kafka source 1 is 65GB and sent by source >>>>>> 2 is >>>>>> 15GB ) >>>>>> >>>>>> Within 10minutes 15M records were processed, and for the next >>>>>> 16minutes the >>>>>> pipeline is stuck , I don't see any progress beyond 15M because of >>>>>> checkpoints getting failed consistently. >>>>>> >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.na >>>>>> bble.com/file/n11882/Checkpointing_Failed.png> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> View this message in context: http://apache-flink-user-maili >>>>>> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with- >>>>>> RocksDB-as-statebackend-tp11752p11882.html >>>>>> >>>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>>> archive at Nabble.com. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> >>>>>> *If you reply to this email, your message will be added to the >>>>>> discussion below:* >>>>>> >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175 >>>>>> 2p11885.html >>>>>> >>>>>> To start a new topic under Apache Flink User Mailing List archive., >>>>>> email [hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=11887&i=1> >>>>>> To unsubscribe from Apache Flink User Mailing List archive., click >>>>>> here. >>>>>> NAML >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> >>>>>> View this message in context: Re: Checkpointing with RocksDB as >>>>>> statebackend >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html> >>>>>> >>>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>>> archive >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>>> at Nabble.com. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> >>>>>> *If you reply to this email, your message will be added to the >>>>>> discussion below:* >>>>>> >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175 >>>>>> 2p11891.html >>>>>> >>>>>> To start a new topic under Apache Flink User Mailing List archive., >>>>>> email [hidden email] >>>>>> To unsubscribe from Apache Flink User Mailing List archive., click >>>>>> here. >>>>>> NAML >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> >>>>>> View this message in context: Re: Checkpointing with RocksDB as >>>>>> statebackend >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11913.html> >>>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>>> archive >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>>> at Nabble.com. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> >>>>>> *If you reply to this email, your message will be added to the >>>>>> discussion below:* >>>>>> >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175 >>>>>> 2p11943.html >>>>>> >>>>>> To start a new topic under Apache Flink User Mailing List archive., >>>>>> email [hidden email] >>>>>> To unsubscribe from Apache Flink User Mailing List archive., click >>>>>> here. >>>>>> NAML >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> >>>>>> View this message in context: Re: Checkpointing with RocksDB as >>>>>> statebackend >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11949.html> >>>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>>> archive >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>>> at Nabble.com. >>>>>> >>>>>> >>>>> >>>> >>> >>> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend- >> tp11752p12209.html >> To start a new topic under Apache Flink User Mailing List archive., email >> [hidden >> email] <http:///user/SendEmail.jtp?type=node&node=12224&i=1> >> To unsubscribe from Apache Flink User Mailing List archive., click here. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > ------------------------------ > View this message in context: Re: Checkpointing with RocksDB as > statebackend > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p12224.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >