@vinay - Flink needs to store all pending windows in the checkpoint, i.e., windows that have elements but have not yet fires/purged.
I guess client side encryption can add to the delay. If you use RocksDB asynchronous snapshots (1.1.x) then this delay should be hidden. Greetings, Stephan On Tue, Sep 27, 2016 at 5:20 PM, vinay patil <vinay18.pa...@gmail.com> wrote: > Hi Stephan, > > Ok, I think that may be taking lot of time, so when you say everything > that it stores does it mean that all the input to the window is stored in > state backend. > > For Ex: for my apply function, the input is is Iterable<DTO>, the DTO can > contain multiple elements, and the DTO contains roughly 50 fields > > So do you mean that the complete DTO will be stored in the state backend ? > If yes then its probably better to use RocksDB as state backend. > > Also I am using AWS Client Side Encryption for writing encrypted data to > S3, so may be that is also taking some time. > > What do you think ? > > Regards, > Vinay Patil > > On Tue, Sep 27, 2016 at 3:51 AM, Stephan Ewen [via Apache Flink User > Mailing List archive.] <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=9211&i=0>> wrote: > >> @vinay - Window operators store everything in the state backend. >> >> On Mon, Sep 26, 2016 at 7:34 PM, vinay patil <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=9189&i=0>> wrote: >> >>> I am not sure about that, I will run the pipeline on cluster and share >>> the details >>> Since window is a stateful operator , it will store only the key part in >>> the state backend and not the value , right ? >>> >>> Regards, >>> Vinay Patil >>> >>> On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User >>> Mailing List archive.] <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=9182&i=0>> wrote: >>> >>>> @vinay - Is it in your case large state that causes slower checkpoints? >>>> >>>> On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=9181&i=0>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I am also facing this issue, in my case the data is flowing >>>>> continuously from the Kafka source, when I increase the checkpoint >>>>> interval >>>>> to 60000, the data gets written to S3 sink. >>>>> >>>>> Is it because some operator is taking more time for processing, like >>>>> in my case I am using a time window of 1sec. >>>>> >>>>> Regards, >>>>> Vinay Patil >>>>> >>>>> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache >>>>> Flink User Mailing List archive.] <[hidden email] >>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote: >>>>> >>>>>> Hi Stefan, >>>>>> >>>>>> Please find my responses below. >>>>>> >>>>>> - What source are you using for the slow input? >>>>>> * [CVP] - Both stream as pointed out in my first mail, are Kafka >>>>>> Streams* >>>>>> - How large is the state that you are checkpointing? >>>>>> >>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as >>>>>> below.* >>>>>> >>>>>> >>>>>> >>>>>> * final StreamExecutionEnvironment streamEnv = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> streamEnv.setStateBackend(new >>>>>> FsStateBackend("file:///tmp/flink/checkpoints")); >>>>>> streamEnv.enableCheckpointing(10000);* >>>>>> >>>>>> >>>>>> * In terms of the state stored, the KS1 stream has payload of >>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... >>>>>> basically >>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are >>>>>> primitives). If you look at the states' sizes in dashboard they are in >>>>>> Kb...* >>>>>> - Can you try to see in the log if actually the state snapshot >>>>>> takes that long, or if it simply takes long for the checkpoint >>>>>> barriers to travel through the stream due to a lot of backpressure? >>>>>> [CVP] -There are no back pressure atleast from the sample >>>>>> computation in the flink dashboard. 100K/second is low load for flink's >>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have >>>>>> attached the Task Manager log (DEBUG) info if that will interest you. >>>>>> >>>>>> I have attached the checkpoints times' as .png from the >>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- >>>>>> you'd see that the checkpoints take more than a minute in each case. >>>>>> Before >>>>>> these checkpoints, the KS2 stream did not have any events. As soon as an >>>>>> event(should be in bytes) was generated, the checkpoints went slow and >>>>>> subsequently a minute more for every checkpoint thereafter. >>>>>> >>>>>> This log was collected from the standalone flink cluster with 1 >>>>>> job manager & 2 TMs. 1 TM was running this application with checkpointing >>>>>> (parallelism=1) >>>>>> >>>>>> Please let me know if you need further info., >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote: >>>>>> >>>>>>> Hi! >>>>>>> >>>>>>> Let's try to figure that one out. Can you give us a bit more >>>>>>> information? >>>>>>> >>>>>>> - What source are you using for the slow input? >>>>>>> - How large is the state that you are checkpointing? >>>>>>> - Can you try to see in the log if actually the state snapshot >>>>>>> takes that long, or if it simply takes long for the checkpoint barriers >>>>>>> to >>>>>>> travel through the stream due to a lot of backpressure? >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email] >>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote: >>>>>>> >>>>>>>> Hi CVP, >>>>>>>> >>>>>>>> I'm not so much familiar with the internals of the checkpointing >>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here. >>>>>>>> >>>>>>>> Best, Fabian >>>>>>>> >>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email] >>>>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>: >>>>>>>> >>>>>>>>> Hi Aljoscha & Fabian, >>>>>>>>> >>>>>>>>> I have a stream application that has 2 stream source as below. >>>>>>>>> >>>>>>>>> KeyedStream<String, String> *ks1* = ds1.keyBy("*") ; >>>>>>>>> KeyedStream<Tuple2<String, V>, String> *ks2* = >>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0); >>>>>>>>> >>>>>>>>> ks1.connect(ks2).flatMap(X); >>>>>>>>> //X is a CoFlatMapFunction that inserts and removes elements >>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched >>>>>>>>> against that state. the CoFlatMapFunction operator maintains >>>>>>>>> ValueState<Tuple2<Long, Long>>; >>>>>>>>> >>>>>>>>> //ks1 is streaming about 100K events/sec from kafka topic >>>>>>>>> //ks2 is streaming about 1 event every 10 minutes... >>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint >>>>>>>>> takes >>>>>>>>> 2 minutes straight away. >>>>>>>>> >>>>>>>>> The version of flink is 1.1.2. >>>>>>>>> >>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend... >>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for >>>>>>>>> many >>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes >>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your >>>>>>>>> reference. >>>>>>>>> >>>>>>>>> Is this an issue with flink checkpointing? >>>>>>>>> >>>>>>>>> Best Regards >>>>>>>>> CVP >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> *flink_job_Plan.png* (42K) Download Attachment >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png> >>>>>> *Flink-Checkpoint-Times.png* (65K) Download Attachment >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png> >>>>>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download >>>>>> Attachment >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> If you reply to this email, your message will be added to the >>>>>> discussion below: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp914 >>>>>> 7p9176.html >>>>>> To start a new topic under Apache Flink User Mailing List archive., >>>>>> email [hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=1> >>>>>> To unsubscribe from Apache Flink User Mailing List archive., click >>>>>> here. >>>>>> NAML >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>>> >>>>> >>>>> >>>>> ------------------------------ >>>>> View this message in context: Re: Flink Checkpoint runs slow for low >>>>> load stream >>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html> >>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>> archive >>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>> at Nabble.com. >>>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> If you reply to this email, your message will be added to the >>>> discussion below: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9181.html >>>> To start a new topic under Apache Flink User Mailing List archive., >>>> email [hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=9182&i=1> >>>> To unsubscribe from Apache Flink User Mailing List archive., click here >>>> . >>>> NAML >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>> >>> >>> >>> ------------------------------ >>> View this message in context: Re: Flink Checkpoint runs slow for low >>> load stream >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9182.html> >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>> at Nabble.com. >>> >> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream- >> tp9147p9189.html >> To start a new topic under Apache Flink User Mailing List archive., email >> [hidden >> email] <http:///user/SendEmail.jtp?type=node&node=9211&i=1> >> To unsubscribe from Apache Flink User Mailing List archive., click here. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > ------------------------------ > View this message in context: Re: Flink Checkpoint runs slow for low load > stream > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9211.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >