@vinay - Flink needs to store all pending windows in the checkpoint, i.e.,
windows that have elements but have not yet fires/purged.

I guess client side encryption can add to the delay.
If you use RocksDB asynchronous snapshots (1.1.x) then this delay should be
hidden.

Greetings,
Stephan


On Tue, Sep 27, 2016 at 5:20 PM, vinay patil <vinay18.pa...@gmail.com>
wrote:

> Hi Stephan,
>
> Ok, I think that may be taking lot of time, so when you say everything
> that it stores does it mean that all the input to the window  is stored in
> state backend.
>
> For Ex: for my apply function, the input is is Iterable<DTO>, the DTO can
> contain multiple elements, and the DTO contains roughly 50 fields
>
> So do you mean that the complete DTO will be stored in the state backend ?
> If yes then its probably better to use RocksDB as state backend.
>
> Also I am using AWS Client Side Encryption for writing encrypted data to
> S3, so may be that is also taking some time.
>
> What do you think ?
>
> Regards,
> Vinay Patil
>
> On Tue, Sep 27, 2016 at 3:51 AM, Stephan Ewen [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=9211&i=0>> wrote:
>
>> @vinay - Window operators store everything in the state backend.
>>
>> On Mon, Sep 26, 2016 at 7:34 PM, vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=9189&i=0>> wrote:
>>
>>> I am not sure about that, I will run the pipeline on cluster and share
>>> the details
>>> Since window is a stateful operator , it will store only the key part in
>>> the state backend and not the value , right ?
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9182&i=0>> wrote:
>>>
>>>> @vinay - Is it in your case large state that causes slower checkpoints?
>>>>
>>>> On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9181&i=0>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am also facing this issue, in my case the data is flowing
>>>>> continuously from the Kafka source, when I increase the checkpoint 
>>>>> interval
>>>>> to 60000, the data gets written to S3 sink.
>>>>>
>>>>> Is it because some operator is taking more time for processing, like
>>>>> in my case I am using a time window of 1sec.
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache
>>>>> Flink User Mailing List archive.] <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>>     Please find my responses below.
>>>>>>
>>>>>>     - What source are you using for the slow input?
>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>>> Streams*
>>>>>>   - How large is the state that you are checkpointing?
>>>>>>
>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>> below.*
>>>>>>
>>>>>>
>>>>>>
>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> streamEnv.setStateBackend(new
>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>> streamEnv.enableCheckpointing(10000);*
>>>>>>
>>>>>>
>>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... 
>>>>>> basically
>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>>> Kb...*
>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>
>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30-
>>>>>> you'd see that the checkpoints take more than a minute in each case. 
>>>>>> Before
>>>>>> these checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>
>>>>>>    This log was collected from the standalone flink cluster with 1
>>>>>> job manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>>> (parallelism=1)
>>>>>>
>>>>>>     Please let me know if you need further info.,
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>> information?
>>>>>>>
>>>>>>>   - What source are you using for the slow input?
>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>> takes that long, or if it simply takes long for the checkpoint barriers 
>>>>>>> to
>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote:
>>>>>>>
>>>>>>>> Hi CVP,
>>>>>>>>
>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>:
>>>>>>>>
>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>
>>>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>>>
>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>
>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>
>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint 
>>>>>>>>> takes
>>>>>>>>> 2 minutes straight away.
>>>>>>>>>
>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>
>>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for 
>>>>>>>>> many
>>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your 
>>>>>>>>> reference.
>>>>>>>>>
>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>
>>>>>>>>>  Best Regards
>>>>>>>>> CVP
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> *flink_job_Plan.png* (42K) Download Attachment
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png>
>>>>>> *Flink-Checkpoint-Times.png* (65K) Download Attachment
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png>
>>>>>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download
>>>>>> Attachment
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> If you reply to this email, your message will be added to the
>>>>>> discussion below:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp914
>>>>>> 7p9176.html
>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>> email [hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=1>
>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>> here.
>>>>>> NAML
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> View this message in context: Re: Flink Checkpoint runs slow for low
>>>>> load stream
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>> at Nabble.com.
>>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9181.html
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email [hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9182&i=1>
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>> ------------------------------
>>> View this message in context: Re: Flink Checkpoint runs slow for low
>>> load stream
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9182.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-
>> tp9147p9189.html
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=9211&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Flink Checkpoint runs slow for low load
> stream
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9211.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Reply via email to