Hi Vinay, I think the issue is tracked here: https://github.com/facebook/rocksdb/issues/1988 <https://github.com/facebook/rocksdb/issues/1988>.
Best, Stefan > Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath > <vishnu.viswanat...@gmail.com>: > > Hi Stephan, > > Is there a ticket number/link to track this, My job has all the conditions > you mentioned. > > Thanks, > Vishnu > > On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <se...@apache.org > <mailto:se...@apache.org>> wrote: > Hi Vinay! > > We just discovered a bug in RocksDB. The bug affects windows without reduce() > or fold(), windows with evictors, and ListState. > > A certain access pattern in RocksDB starts being so slow after a certain > size-per-key that it basically brings down the streaming program and the > snapshots. > > We are reaching out to the RocksDB folks and looking for workarounds in Flink. > > Greetings, > Stephan > > > On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <se...@apache.org > <mailto:se...@apache.org>> wrote: > @vinay Can you try to not set the buffer timeout at all? I am actually not > sure what would be the effect of setting it to a negative value, that can be > a cause of problems... > > > On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <swies...@mediamath.com > <mailto:swies...@mediamath.com>> wrote: > Vinay, > > > > The bucketing sink performs rename operations during the checkpoint and if it > tries to rename a file that is not yet consistent that would cause a > FileNotFound exception which would fail the checkpoint. > > > > Stephan, > > > > Currently my aws fork contains some very specific assumptions about the > pipeline that will in general only hold for my pipeline. This is because > there were still some open questions that I had about how to solve > consistency issues in the general case. I will comment on the Jira issue with > more specific. > > > > Seth Wiesman > > > > From: vinay patil <vinay18.pa...@gmail.com <mailto:vinay18.pa...@gmail.com>> > Reply-To: "user@flink.apache.org <mailto:user@flink.apache.org>" > <user@flink.apache.org <mailto:user@flink.apache.org>> > Date: Monday, February 27, 2017 at 1:05 PM > To: "user@flink.apache.org <mailto:user@flink.apache.org>" > <user@flink.apache.org <mailto:user@flink.apache.org>> > > > Subject: Re: Checkpointing with RocksDB as statebackend > > > > Hi Seth, > > Thank you for your suggestion. > > But if the issue is only related to S3, then why does this happen when I > replace the S3 sink to HDFS as well (for checkpointing I am using HDFS only ) > > Stephan, > > Another issue I see is when I set env.setBufferTimeout(-1) , and keep the > checkpoint interval to 10minutes, I have observed that nothing gets written > to sink (tried with S3 as well as HDFS), atleast I was expecting pending > files here. > > This issue gets worst when checkpointing is disabled as nothing is written. > > > > > > Regards, > > Vinay Patil > > > > On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User Mailing > List archive.] <[hidden email] <>> wrote: > > Hi Seth! > > > > Wow, that is an awesome approach. > > > > We have actually seen these issues as well and we are looking to eventually > implement our own S3 file system (and circumvent Hadoop's S3 connector that > Flink currently relies on): https://issues.apache.org/jira/browse/FLINK-5706 > <https://issues.apache.org/jira/browse/FLINK-5706> > > > Do you think your patch would be a good starting point for that and would you > be willing to share it? > > > > The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible to fork > officially, if necessary... > > > > Greetings, > > Stephan > > > > > > > > On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden email] > <http://user/SendEmail.jtp?type=node&node=11943&i=0>> wrote: > > Just wanted to throw in my 2cts. > > > > I’ve been running pipelines with similar state size using rocksdb which > externalize to S3 and bucket to S3. I was getting stalls like this and ended > up tracing the problem to S3 and the bucketing sink. The solution was two > fold: > > > > 1) I forked hadoop-aws and have it treat flink as a source of truth. > Emr uses a dynamodb table to determine if S3 is inconsistent. Instead I say > that if flink believes that a file exists on S3 and we don’t see it then I am > going to trust that flink is in a consistent state and S3 is not. In this > case, various operations will perform a back off and retry up to a certain > number of times. > > > > 2) The bucketing sink performs multiple renames over the lifetime of a > file, occurring when a checkpoint starts and then again on notification after > it completes. Due to S3’s consistency guarantees the second rename of file > can never be assured to work and will eventually fail either during or after > a checkpoint. Because there is no upper bound on the time it will take for a > file on S3 to become consistent, retries cannot solve this specific problem > as it could take upwards of many minutes to rename which would stall the > entire pipeline. The only viable solution I could find was to write a custom > sink which understands S3. Each writer will write file locally and then copy > it to S3 on checkpoint. By only interacting with S3 once per file it can > circumvent consistency issues all together. > > > > Hope this helps, > > > > Seth Wiesman > > > > From: vinay patil <[hidden email] > <http://user/SendEmail.jtp?type=node&node=11943&i=1>> > Reply-To: "[hidden email] > <http://user/SendEmail.jtp?type=node&node=11943&i=2>" <[hidden email] > <http://user/SendEmail.jtp?type=node&node=11943&i=3>> > Date: Saturday, February 25, 2017 at 10:50 AM > To: "[hidden email] <http://user/SendEmail.jtp?type=node&node=11943&i=4>" > <[hidden email] <http://user/SendEmail.jtp?type=node&node=11943&i=5>> > Subject: Re: Checkpointing with RocksDB as statebackend > > > > HI Stephan, > > Just to avoid the confusion here, I am using S3 sink for writing the data, > and using HDFS for storing checkpoints. > > There are 2 core nodes (HDFS) and two task nodes on EMR > > > I replaced s3 sink with HDFS for writing data in my last test. > > Let's say the checkpoint interval is 5 minutes, now within 5minutes of run > the state size grows to 30GB , after checkpointing the 30GB state that is > maintained in rocksDB has to be copied to HDFS, right ? is this causing the > pipeline to stall ? > > > > Regards, > > Vinay Patil > > > > On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote: > > Hi Stephan, > > To verify if S3 is making teh pipeline stall, I have replaced the S3 sink > with HDFS and kept minimum pause between checkpoints to 5minutes, still I see > the same issue with checkpoints getting failed. > > If I keep the pause time to 20 seconds, all checkpoints are completed , > however there is a hit in overall throughput. > > > > > > Regards, > > Vinay Patil > > > > On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User Mailing > List archive.] <[hidden email]> wrote: > > Flink's state backends currently do a good number of "make sure this exists" > operations on the file systems. Through Hadoop's S3 filesystem, that > translates to S3 bucket list operations, where there is a limit in how many > operation may happen per time interval. After that, S3 blocks. > > > > It seems that operations that are totally cheap on HDFS are hellishly > expensive (and limited) on S3. It may be that you are affected by that. > > > > We are gradually trying to improve the behavior there and be more S3 aware. > > > > Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there. > > > > Best, > > Stephan > > > > > > On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden email] > <http://user/SendEmail.jtp?type=node&node=11891&i=0>> wrote: > > Hi Stephan, > > So do you mean that S3 is causing the stall , as I have mentioned in my > previous mail, I could not see any progress for 16minutes as checkpoints were > getting failed continuously. > > > > On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing List > archive.]" <[hidden email] > <http://user/SendEmail.jtp?type=node&node=11887&i=0>> wrote: > > Hi Vinay! > > > > True, the operator state (like Kafka) is currently not asynchronously > checkpointed. > > > > While it is rather small state, we have seen before that on S3 it can cause > trouble, because S3 frequently stalls uploads of even data amounts as low as > kilobytes due to its throttling policies. > > > > That would be a super important fix to add! > > > > Best, > > Stephan > > > > > > On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email] > <http://user/SendEmail.jtp?type=node&node=11885&i=0>> wrote: > > Hi, > > I have attached a snapshot for reference: > As you can see all the 3 checkpointins failed , for checkpoint ID 2 and 3 it > is stuck at the Kafka source after 50% > (The data sent till now by Kafka source 1 is 65GB and sent by source 2 is > 15GB ) > > Within 10minutes 15M records were processed, and for the next 16minutes the > pipeline is stuck , I don't see any progress beyond 15M because of > checkpoints getting failed consistently. > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11882/Checkpointing_Failed.png > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11882/Checkpointing_Failed.png>> > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11882.html > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11882.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com. > > > > > > If you reply to this email, your message will be added to the discussion > below: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11885.html > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11885.html> > To start a new topic under Apache Flink User Mailing List archive., email > [hidden email] <http://user/SendEmail.jtp?type=node&node=11887&i=1> > To unsubscribe from Apache Flink User Mailing List archive., click here. > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > > > View this message in context: Re: Checkpointing with RocksDB as statebackend > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. > > > > > > If you reply to this email, your message will be added to the discussion > below: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11891.html > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11891.html> > To start a new topic under Apache Flink User Mailing List archive., email > [hidden email] > To unsubscribe from Apache Flink User Mailing List archive., click here. > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > > > > > > > View this message in context: Re: Checkpointing with RocksDB as statebackend > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11913.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. > > > > > > If you reply to this email, your message will be added to the discussion > below: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11943.html > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11943.html> > To start a new topic under Apache Flink User Mailing List archive., email > [hidden email] <> > To unsubscribe from Apache Flink User Mailing List archive., click here <>. > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > > > > > View this message in context: Re: Checkpointing with RocksDB as statebackend > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11949.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. > > > > >