Hi, I am also facing this issue, in my case the data is flowing continuously from the Kafka source, when I increase the checkpoint interval to 60000, the data gets written to S3 sink.
Is it because some operator is taking more time for processing, like in my case I am using a time window of 1sec. Regards, Vinay Patil On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache Flink User Mailing List archive.] <ml-node+s2336050n917...@n4.nabble.com> wrote: > Hi Stefan, > > Please find my responses below. > > - What source are you using for the slow input? > * [CVP] - Both stream as pointed out in my first mail, are Kafka > Streams* > - How large is the state that you are checkpointing? > > *[CVP] - I have enabled checkpointing on the StreamEnvironment as below.* > > > > * final StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > streamEnv.setStateBackend(new > FsStateBackend("file:///tmp/flink/checkpoints")); > streamEnv.enableCheckpointing(10000);* > > > * In terms of the state stored, the KS1 stream has payload of 100K > events/second, while KS2 have about 1 event / 10 minutes... basically the > operators perform flatmaps on 8 fields of tuple (all fields are > primitives). If you look at the states' sizes in dashboard they are in > Kb...* > - Can you try to see in the log if actually the state snapshot takes > that long, or if it simply takes long for the checkpoint barriers to > travel through the stream due to a lot of backpressure? > [CVP] -There are no back pressure atleast from the sample computation > in the flink dashboard. 100K/second is low load for flink's benchmarks. I > could not quite get the barriers vs snapshot state. I have attached the > Task Manager log (DEBUG) info if that will interest you. > > I have attached the checkpoints times' as .png from the dashboard. > Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the > checkpoints take more than a minute in each case. Before these checkpoints, > the KS2 stream did not have any events. As soon as an event(should be in > bytes) was generated, the checkpoints went slow and subsequently a minute > more for every checkpoint thereafter. > > This log was collected from the standalone flink cluster with 1 job > manager & 2 TMs. 1 TM was running this application with checkpointing > (parallelism=1) > > Please let me know if you need further info., > > > > On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote: > >> Hi! >> >> Let's try to figure that one out. Can you give us a bit more information? >> >> - What source are you using for the slow input? >> - How large is the state that you are checkpointing? >> - Can you try to see in the log if actually the state snapshot takes >> that long, or if it simply takes long for the checkpoint barriers to travel >> through the stream due to a lot of backpressure? >> >> Greetings, >> Stephan >> >> >> >> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote: >> >>> Hi CVP, >>> >>> I'm not so much familiar with the internals of the checkpointing system, >>> but maybe Stephan (in CC) has an idea what's going on here. >>> >>> Best, Fabian >>> >>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>: >>> >>>> Hi Aljoscha & Fabian, >>>> >>>> I have a stream application that has 2 stream source as below. >>>> >>>> KeyedStream<String, String> *ks1* = ds1.keyBy("*") ; >>>> KeyedStream<Tuple2<String, V>, String> *ks2* = ds2.flatMap(split >>>> T into k-v pairs).keyBy(0); >>>> >>>> ks1.connect(ks2).flatMap(X); >>>> //X is a CoFlatMapFunction that inserts and removes elements from >>>> ks2 into a key-value state member. Elements from ks1 are matched against >>>> that state. the CoFlatMapFunction operator maintains >>>> ValueState<Tuple2<Long, Long>>; >>>> >>>> //ks1 is streaming about 100K events/sec from kafka topic >>>> //ks2 is streaming about 1 event every 10 minutes... Precisely >>>> when the 1st event is consumed from this stream, checkpoint takes 2 minutes >>>> straight away. >>>> >>>> The version of flink is 1.1.2. >>>> >>>> I tried to use checkpoint every 10 Secs using a FsStateBackend... What >>>> I notice is that the checkpoint duration is almost 2 minutes for many >>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes >>>> frequently. I'm attaching the snapshot of the dashboard for your reference. >>>> >>>> Is this an issue with flink checkpointing? >>>> >>>> Best Regards >>>> CVP >>>> >>> >>> >> > > *flink_job_Plan.png* (42K) Download Attachment > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png> > *Flink-Checkpoint-Times.png* (65K) Download Attachment > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png> > *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download Attachment > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log> > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink- > Checkpoint-runs-slow-for-low-load-stream-tp9147p9176.html > To start a new topic under Apache Flink User Mailing List archive., email > ml-node+s2336050n1...@n4.nabble.com > To unsubscribe from Apache Flink User Mailing List archive., click here > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx> > . > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.