The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756
On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <s.rich...@data-artisans.com > wrote: > Hi Vinay, > > I think the issue is tracked here: https://github.com/ > facebook/rocksdb/issues/1988. > > Best, > Stefan > > Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath < > vishnu.viswanat...@gmail.com>: > > Hi Stephan, > > Is there a ticket number/link to track this, My job has all the conditions > you mentioned. > > Thanks, > Vishnu > > On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <se...@apache.org> wrote: > >> Hi Vinay! >> >> We just discovered a bug in RocksDB. The bug affects windows without >> reduce() or fold(), windows with evictors, and ListState. >> >> A certain access pattern in RocksDB starts being so slow after a certain >> size-per-key that it basically brings down the streaming program and the >> snapshots. >> >> We are reaching out to the RocksDB folks and looking for workarounds in >> Flink. >> >> Greetings, >> Stephan >> >> >> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> @vinay Can you try to not set the buffer timeout at all? I am actually >>> not sure what would be the effect of setting it to a negative value, that >>> can be a cause of problems... >>> >>> >>> On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <swies...@mediamath.com> >>> wrote: >>> >>>> Vinay, >>>> >>>> >>>> >>>> The bucketing sink performs rename operations during the checkpoint and >>>> if it tries to rename a file that is not yet consistent that would cause a >>>> FileNotFound exception which would fail the checkpoint. >>>> >>>> >>>> >>>> Stephan, >>>> >>>> >>>> >>>> Currently my aws fork contains some very specific assumptions about the >>>> pipeline that will in general only hold for my pipeline. This is because >>>> there were still some open questions that I had about how to solve >>>> consistency issues in the general case. I will comment on the Jira issue >>>> with more specific. >>>> >>>> >>>> >>>> Seth Wiesman >>>> >>>> >>>> >>>> *From: *vinay patil <vinay18.pa...@gmail.com> >>>> *Reply-To: *"user@flink.apache.org" <user@flink.apache.org> >>>> *Date: *Monday, February 27, 2017 at 1:05 PM >>>> *To: *"user@flink.apache.org" <user@flink.apache.org> >>>> >>>> *Subject: *Re: Checkpointing with RocksDB as statebackend >>>> >>>> >>>> >>>> Hi Seth, >>>> >>>> Thank you for your suggestion. >>>> >>>> But if the issue is only related to S3, then why does this happen when >>>> I replace the S3 sink to HDFS as well (for checkpointing I am using HDFS >>>> only ) >>>> >>>> Stephan, >>>> >>>> Another issue I see is when I set env.setBufferTimeout(-1) , and keep >>>> the checkpoint interval to 10minutes, I have observed that nothing gets >>>> written to sink (tried with S3 as well as HDFS), atleast I was expecting >>>> pending files here. >>>> >>>> This issue gets worst when checkpointing is disabled as nothing is >>>> written. >>>> >>>> >>>> >>>> >>>> Regards, >>>> >>>> Vinay Patil >>>> >>>> >>>> >>>> On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User >>>> Mailing List archive.] <[hidden email]> wrote: >>>> >>>> Hi Seth! >>>> >>>> >>>> >>>> Wow, that is an awesome approach. >>>> >>>> >>>> >>>> We have actually seen these issues as well and we are looking to >>>> eventually implement our own S3 file system (and circumvent Hadoop's S3 >>>> connector that Flink currently relies on): https://issues.apache.org >>>> /jira/browse/FLINK-5706 >>>> >>>> >>>> >>>> Do you think your patch would be a good starting point for that and >>>> would you be willing to share it? >>>> >>>> >>>> >>>> The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible >>>> to fork officially, if necessary... >>>> >>>> >>>> >>>> Greetings, >>>> >>>> Stephan >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=11943&i=0>> wrote: >>>> >>>> Just wanted to throw in my 2cts. >>>> >>>> >>>> >>>> I’ve been running pipelines with similar state size using rocksdb which >>>> externalize to S3 and bucket to S3. I was getting stalls like this and >>>> ended up tracing the problem to S3 and the bucketing sink. The solution was >>>> two fold: >>>> >>>> >>>> >>>> 1) I forked hadoop-aws and have it treat flink as a source of >>>> truth. Emr uses a dynamodb table to determine if S3 is inconsistent. >>>> Instead I say that if flink believes that a file exists on S3 and we don’t >>>> see it then I am going to trust that flink is in a consistent state and S3 >>>> is not. In this case, various operations will perform a back off and retry >>>> up to a certain number of times. >>>> >>>> >>>> >>>> 2) The bucketing sink performs multiple renames over the >>>> lifetime of a file, occurring when a checkpoint starts and then again on >>>> notification after it completes. Due to S3’s consistency guarantees the >>>> second rename of file can never be assured to work and will eventually fail >>>> either during or after a checkpoint. Because there is no upper bound on the >>>> time it will take for a file on S3 to become consistent, retries cannot >>>> solve this specific problem as it could take upwards of many minutes to >>>> rename which would stall the entire pipeline. The only viable solution I >>>> could find was to write a custom sink which understands S3. Each writer >>>> will write file locally and then copy it to S3 on checkpoint. By only >>>> interacting with S3 once per file it can circumvent consistency issues all >>>> together. >>>> >>>> >>>> >>>> Hope this helps, >>>> >>>> >>>> >>>> Seth Wiesman >>>> >>>> >>>> >>>> *From: *vinay patil <[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=11943&i=1>> >>>> *Reply-To: *"[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=11943&i=2>" <[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=11943&i=3>> >>>> *Date: *Saturday, February 25, 2017 at 10:50 AM >>>> *To: *"[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=11943&i=4>" <[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=11943&i=5>> >>>> *Subject: *Re: Checkpointing with RocksDB as statebackend >>>> >>>> >>>> >>>> HI Stephan, >>>> >>>> Just to avoid the confusion here, I am using S3 sink for writing the >>>> data, and using HDFS for storing checkpoints. >>>> >>>> There are 2 core nodes (HDFS) and two task nodes on EMR >>>> >>>> >>>> I replaced s3 sink with HDFS for writing data in my last test. >>>> >>>> Let's say the checkpoint interval is 5 minutes, now within 5minutes of >>>> run the state size grows to 30GB , after checkpointing the 30GB state that >>>> is maintained in rocksDB has to be copied to HDFS, right ? is this causing >>>> the pipeline to stall ? >>>> >>>> >>>> Regards, >>>> >>>> Vinay Patil >>>> >>>> >>>> >>>> On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote: >>>> >>>> Hi Stephan, >>>> >>>> To verify if S3 is making teh pipeline stall, I have replaced the S3 >>>> sink with HDFS and kept minimum pause between checkpoints to 5minutes, >>>> still I see the same issue with checkpoints getting failed. >>>> >>>> If I keep the pause time to 20 seconds, all checkpoints are completed >>>> , however there is a hit in overall throughput. >>>> >>>> >>>> >>>> >>>> Regards, >>>> >>>> Vinay Patil >>>> >>>> >>>> >>>> On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User >>>> Mailing List archive.] <[hidden email]> wrote: >>>> >>>> Flink's state backends currently do a good number of "make sure this >>>> exists" operations on the file systems. Through Hadoop's S3 filesystem, >>>> that translates to S3 bucket list operations, where there is a limit in how >>>> many operation may happen per time interval. After that, S3 blocks. >>>> >>>> >>>> >>>> It seems that operations that are totally cheap on HDFS are hellishly >>>> expensive (and limited) on S3. It may be that you are affected by that. >>>> >>>> >>>> >>>> We are gradually trying to improve the behavior there and be more S3 >>>> aware. >>>> >>>> >>>> >>>> Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there. >>>> >>>> >>>> >>>> Best, >>>> >>>> Stephan >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=11891&i=0>> wrote: >>>> >>>> Hi Stephan, >>>> >>>> So do you mean that S3 is causing the stall , as I have mentioned in my >>>> previous mail, I could not see any progress for 16minutes as checkpoints >>>> were getting failed continuously. >>>> >>>> >>>> >>>> On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing >>>> List archive.]" <[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=11887&i=0>> wrote: >>>> >>>> Hi Vinay! >>>> >>>> >>>> >>>> True, the operator state (like Kafka) is currently not asynchronously >>>> checkpointed. >>>> >>>> >>>> >>>> While it is rather small state, we have seen before that on S3 it can >>>> cause trouble, because S3 frequently stalls uploads of even data amounts as >>>> low as kilobytes due to its throttling policies. >>>> >>>> >>>> >>>> That would be a super important fix to add! >>>> >>>> >>>> >>>> Best, >>>> >>>> Stephan >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=11885&i=0>> wrote: >>>> >>>> Hi, >>>> >>>> I have attached a snapshot for reference: >>>> As you can see all the 3 checkpointins failed , for checkpoint ID 2 and >>>> 3 it >>>> is stuck at the Kafka source after 50% >>>> (The data sent till now by Kafka source 1 is 65GB and sent by source 2 >>>> is >>>> 15GB ) >>>> >>>> Within 10minutes 15M records were processed, and for the next 16minutes >>>> the >>>> pipeline is stuck , I don't see any progress beyond 15M because of >>>> checkpoints getting failed consistently. >>>> >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.na >>>> bble.com/file/n11882/Checkpointing_Failed.png> >>>> >>>> >>>> >>>> -- >>>> View this message in context: http://apache-flink-user-maili >>>> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with- >>>> RocksDB-as-statebackend-tp11752p11882.html >>>> >>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>> archive at Nabble.com. >>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> >>>> *If you reply to this email, your message will be added to the >>>> discussion below:* >>>> >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175 >>>> 2p11885.html >>>> >>>> To start a new topic under Apache Flink User Mailing List archive., >>>> email [hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=11887&i=1> >>>> To unsubscribe from Apache Flink User Mailing List archive., click here. >>>> NAML >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>> >>>> >>>> ------------------------------ >>>> >>>> View this message in context: Re: Checkpointing with RocksDB as >>>> statebackend >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html> >>>> >>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>> archive >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>> at Nabble.com. >>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> >>>> *If you reply to this email, your message will be added to the >>>> discussion below:* >>>> >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175 >>>> 2p11891.html >>>> >>>> To start a new topic under Apache Flink User Mailing List archive., >>>> email [hidden email] >>>> To unsubscribe from Apache Flink User Mailing List archive., click here. >>>> NAML >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>> >>>> >>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> >>>> View this message in context: Re: Checkpointing with RocksDB as >>>> statebackend >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11913.html> >>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>> archive >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>> at Nabble.com. >>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> >>>> *If you reply to this email, your message will be added to the >>>> discussion below:* >>>> >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175 >>>> 2p11943.html >>>> >>>> To start a new topic under Apache Flink User Mailing List archive., >>>> email [hidden email] >>>> To unsubscribe from Apache Flink User Mailing List archive., click here >>>> . >>>> NAML >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> >>>> View this message in context: Re: Checkpointing with RocksDB as >>>> statebackend >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11949.html> >>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>> archive >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>> at Nabble.com. >>>> >>>> >>> >> > >