Hi Vinay!

Savepoints also call the same problematic RocksDB function, unfortunately.

We will have a fix next month. We either (1) get a patched RocksDB version
or we (2) implement a different pattern for ListState in Flink.

(1) would be the better solution, so we are waiting for a response from the
RocksDB folks. (2) is always possible if we cannot get a fix from RocksDB.

Stephan


On Wed, Mar 15, 2017 at 5:53 PM, vinay patil <vinay18.pa...@gmail.com>
wrote:

> Hi Stephan,
>
> Thank you for making me aware of this.
>
> Yes I am using a window without reduce function (Apply function). The
> discussion happening on JIRA is exactly what I am observing, consistent
> failure of checkpoints after some time and the stream halts.
>
> We want to go live in next month, not sure how this will affect in
> production as we are going to get above 200 million data.
>
> As a workaround can I take the savepoint while the pipeline is running ?
> Let's say if I take savepoint after every 30minutes, will it work ?
>
>
>
> Regards,
> Vinay Patil
>
> On Tue, Mar 14, 2017 at 10:02 PM, Stephan Ewen [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=12224&i=0>> wrote:
>
>> The issue in Flink is https://issues.apache.org/jira/browse/FLINK-5756
>>
>> On Tue, Mar 14, 2017 at 3:40 PM, Stefan Richter <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=12209&i=0>> wrote:
>>
>>> Hi Vinay,
>>>
>>> I think the issue is tracked here: https://github.com/faceb
>>> ook/rocksdb/issues/1988.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 14.03.2017 um 15:31 schrieb Vishnu Viswanath <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=1>>:
>>>
>>> Hi Stephan,
>>>
>>> Is there a ticket number/link to track this, My job has all the
>>> conditions you mentioned.
>>>
>>> Thanks,
>>> Vishnu
>>>
>>> On Tue, Mar 14, 2017 at 7:13 AM, Stephan Ewen <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=2>> wrote:
>>>
>>>> Hi Vinay!
>>>>
>>>> We just discovered a bug in RocksDB. The bug affects windows without
>>>> reduce() or fold(), windows with evictors, and ListState.
>>>>
>>>> A certain access pattern in RocksDB starts being so slow after a
>>>> certain size-per-key that it basically brings down the streaming program
>>>> and the snapshots.
>>>>
>>>> We are reaching out to the RocksDB folks and looking for workarounds in
>>>> Flink.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Mar 1, 2017 at 12:10 PM, Stephan Ewen <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=3>> wrote:
>>>>
>>>>> @vinay  Can you try to not set the buffer timeout at all? I am
>>>>> actually not sure what would be the effect of setting it to a negative
>>>>> value, that can be a cause of problems...
>>>>>
>>>>>
>>>>> On Mon, Feb 27, 2017 at 7:44 PM, Seth Wiesman <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=4>> wrote:
>>>>>
>>>>>> Vinay,
>>>>>>
>>>>>>
>>>>>>
>>>>>> The bucketing sink performs rename operations during the checkpoint
>>>>>> and if it tries to rename a file that is not yet consistent that would
>>>>>> cause a FileNotFound exception which would fail the checkpoint.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Stephan,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Currently my aws fork contains some very specific assumptions about
>>>>>> the pipeline that will in general only hold for my pipeline. This is
>>>>>> because there were still some open questions that  I had about how to 
>>>>>> solve
>>>>>> consistency issues in the general case. I will comment on the Jira issue
>>>>>> with more specific.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Seth Wiesman
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *vinay patil <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=5>>
>>>>>> *Reply-To: *"[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=6>" <[hidden
>>>>>> email] <http:///user/SendEmail.jtp?type=node&node=12209&i=7>>
>>>>>> *Date: *Monday, February 27, 2017 at 1:05 PM
>>>>>> *To: *"[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=12209&i=8>" <[hidden
>>>>>> email] <http:///user/SendEmail.jtp?type=node&node=12209&i=9>>
>>>>>>
>>>>>>
>>>>>> *Subject: *Re: Checkpointing with RocksDB as statebackend
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi Seth,
>>>>>>
>>>>>> Thank you for your suggestion.
>>>>>>
>>>>>> But if the issue is only related to S3, then why does this happen
>>>>>> when I replace the S3 sink  to HDFS as well (for checkpointing I am using
>>>>>> HDFS only )
>>>>>>
>>>>>> Stephan,
>>>>>>
>>>>>> Another issue I see is when I set env.setBufferTimeout(-1) , and keep
>>>>>> the checkpoint interval to 10minutes, I have observed that nothing gets
>>>>>> written to sink (tried with S3 as well as HDFS), atleast I was expecting
>>>>>> pending files here.
>>>>>>
>>>>>> This issue gets worst when checkpointing is disabled  as nothing is
>>>>>> written.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Vinay Patil
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User
>>>>>> Mailing List archive.] <[hidden email]> wrote:
>>>>>>
>>>>>> Hi Seth!
>>>>>>
>>>>>>
>>>>>>
>>>>>> Wow, that is an awesome approach.
>>>>>>
>>>>>>
>>>>>>
>>>>>> We have actually seen these issues as well and we are looking to
>>>>>> eventually implement our own S3 file system (and circumvent Hadoop's S3
>>>>>> connector that Flink currently relies on): https://issues.apache.org
>>>>>> /jira/browse/FLINK-5706
>>>>>>
>>>>>>
>>>>>>
>>>>>> Do you think your patch would be a good starting point for that and
>>>>>> would you be willing to share it?
>>>>>>
>>>>>>
>>>>>>
>>>>>> The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible
>>>>>> to fork officially, if necessary...
>>>>>>
>>>>>>
>>>>>>
>>>>>> Greetings,
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=0>> wrote:
>>>>>>
>>>>>> Just wanted to throw in my 2cts.
>>>>>>
>>>>>>
>>>>>>
>>>>>> I’ve been running pipelines with similar state size using rocksdb
>>>>>> which externalize to S3 and bucket to S3. I was getting stalls like this
>>>>>> and ended up tracing the problem to S3 and the bucketing sink. The 
>>>>>> solution
>>>>>> was two fold:
>>>>>>
>>>>>>
>>>>>>
>>>>>> 1)       I forked hadoop-aws and have it treat flink as a source of
>>>>>> truth. Emr uses a dynamodb table to determine if S3 is inconsistent.
>>>>>> Instead I say that if flink believes that a file exists on S3 and we 
>>>>>> don’t
>>>>>> see it then I am going to trust that flink is in a consistent state and 
>>>>>> S3
>>>>>> is not. In this case, various operations will perform a back off and 
>>>>>> retry
>>>>>> up to a certain number of times.
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2)       The bucketing sink performs multiple renames over the
>>>>>> lifetime of a file, occurring when a checkpoint starts and then again on
>>>>>> notification after it completes. Due to S3’s consistency guarantees the
>>>>>> second rename of file can never be assured to work and will eventually 
>>>>>> fail
>>>>>> either during or after a checkpoint. Because there is no upper bound on 
>>>>>> the
>>>>>> time it will take for a file on S3 to become consistent, retries cannot
>>>>>> solve this specific problem as it could take upwards of many minutes to
>>>>>> rename which would stall the entire pipeline. The only viable solution I
>>>>>> could find was to write a custom sink which understands S3. Each writer
>>>>>> will write file locally and then copy it to S3 on checkpoint. By only
>>>>>> interacting with S3 once per file it can circumvent consistency issues 
>>>>>> all
>>>>>> together.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hope this helps,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Seth Wiesman
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *vinay patil <[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=1>>
>>>>>> *Reply-To: *"[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=2>" <[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=3>>
>>>>>> *Date: *Saturday, February 25, 2017 at 10:50 AM
>>>>>> *To: *"[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=4>" <[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=11943&i=5>>
>>>>>> *Subject: *Re: Checkpointing with RocksDB as statebackend
>>>>>>
>>>>>>
>>>>>>
>>>>>> HI Stephan,
>>>>>>
>>>>>> Just to avoid the confusion here, I am using S3 sink for writing the
>>>>>> data, and using HDFS for storing checkpoints.
>>>>>>
>>>>>> There are 2 core nodes (HDFS) and two task nodes on EMR
>>>>>>
>>>>>>
>>>>>> I replaced s3 sink with HDFS for writing data in my last test.
>>>>>>
>>>>>> Let's say the checkpoint interval is 5 minutes, now within 5minutes
>>>>>> of run the state size grows to 30GB ,  after checkpointing the 30GB state
>>>>>> that is maintained in rocksDB has to be copied to HDFS, right ?  is this
>>>>>> causing the pipeline to stall ?
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Vinay Patil
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote:
>>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>> To verify if S3 is making teh pipeline stall, I have replaced the S3
>>>>>> sink with HDFS and kept minimum pause between checkpoints to 5minutes,
>>>>>> still I see the same issue with checkpoints getting failed.
>>>>>>
>>>>>> If I keep the  pause time to 20 seconds, all checkpoints are
>>>>>> completed , however there is a hit in overall throughput.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Vinay Patil
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User
>>>>>> Mailing List archive.] <[hidden email]> wrote:
>>>>>>
>>>>>> Flink's state backends currently do a good number of "make sure this
>>>>>> exists" operations on the file systems. Through Hadoop's S3 filesystem,
>>>>>> that translates to S3 bucket list operations, where there is a limit in 
>>>>>> how
>>>>>> many operation may happen per time interval. After that, S3 blocks.
>>>>>>
>>>>>>
>>>>>>
>>>>>> It seems that operations that are totally cheap on HDFS are hellishly
>>>>>> expensive (and limited) on S3. It may be that you are affected by that.
>>>>>>
>>>>>>
>>>>>>
>>>>>> We are gradually trying to improve the behavior there and be more S3
>>>>>> aware.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=11891&i=0>> wrote:
>>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>> So do you mean that S3 is causing the stall , as I have mentioned in
>>>>>> my previous mail, I could not see any progress for 16minutes as 
>>>>>> checkpoints
>>>>>> were getting failed continuously.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing
>>>>>> List archive.]" <[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=11887&i=0>> wrote:
>>>>>>
>>>>>> Hi Vinay!
>>>>>>
>>>>>>
>>>>>>
>>>>>> True, the operator state (like Kafka) is currently not asynchronously
>>>>>> checkpointed.
>>>>>>
>>>>>>
>>>>>>
>>>>>> While it is rather small state, we have seen before that on S3 it can
>>>>>> cause trouble, because S3 frequently stalls uploads of even data amounts 
>>>>>> as
>>>>>> low as kilobytes due to its throttling policies.
>>>>>>
>>>>>>
>>>>>>
>>>>>> That would be a super important fix to add!
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=11885&i=0>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have attached a snapshot for reference:
>>>>>> As you can see all the 3 checkpointins failed , for checkpoint ID 2
>>>>>> and 3 it
>>>>>> is stuck at the Kafka source after 50%
>>>>>> (The data sent till now by Kafka source 1 is 65GB and sent by source
>>>>>> 2 is
>>>>>> 15GB )
>>>>>>
>>>>>> Within 10minutes 15M records were processed, and for the next
>>>>>> 16minutes the
>>>>>> pipeline is stuck , I don't see any progress beyond 15M because of
>>>>>> checkpoints getting failed consistently.
>>>>>>
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.na
>>>>>> bble.com/file/n11882/Checkpointing_Failed.png>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context: http://apache-flink-user-maili
>>>>>> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-
>>>>>> RocksDB-as-statebackend-tp11752p11882.html
>>>>>>
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive at Nabble.com.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>> *If you reply to this email, your message will be added to the
>>>>>> discussion below:*
>>>>>>
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175
>>>>>> 2p11885.html
>>>>>>
>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>> email [hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=11887&i=1>
>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>> here.
>>>>>> NAML
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>> View this message in context: Re: Checkpointing with RocksDB as
>>>>>> statebackend
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html>
>>>>>>
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>> at Nabble.com.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>> *If you reply to this email, your message will be added to the
>>>>>> discussion below:*
>>>>>>
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175
>>>>>> 2p11891.html
>>>>>>
>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>> email [hidden email]
>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>> here.
>>>>>> NAML
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>> View this message in context: Re: Checkpointing with RocksDB as
>>>>>> statebackend
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11913.html>
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>> at Nabble.com.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>> *If you reply to this email, your message will be added to the
>>>>>> discussion below:*
>>>>>>
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>> ble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp1175
>>>>>> 2p11943.html
>>>>>>
>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>> email [hidden email]
>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>> here.
>>>>>> NAML
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>> View this message in context: Re: Checkpointing with RocksDB as
>>>>>> statebackend
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11949.html>
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>> at Nabble.com.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-
>> tp11752p12209.html
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=12224&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Checkpointing with RocksDB as
> statebackend
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p12224.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Reply via email to