BRILLIANT !!! Checkpoint times are consistent with 1.1.4...
Thanks for your formidable support ! Best Regards CVP On Wed, Jan 4, 2017 at 5:33 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi CVP, > > we recently release Flink 1.1.4, i.e., the next bugfix release of the > 1.1.x series with major robustness improvements [1]. > You might want to give 1.1.4 a try as well. > > Best, Fabian > > [1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html > > 2017-01-04 16:51 GMT+01:00 Chakravarthy varaga <chakravarth...@gmail.com>: > >> Hi Stephan, All, >> >> I just got a chance to try if 1.1.3 fixes slow check pointing on FS >> backend. It seemed to have been fixed. Thanks for the fix. >> >> While testing this, with varying check point intervals, there seem >> to be Spikes of slow checkpoints every 30/40 seconds for an interval of 15 >> secs. The check point time lasts for about 300 ms as apposed to 10/20 ms. >> Basically 15 secs seem to be the nominal value so far. anything >> below this interval shoots the spikes too often. For us living with 15 sec >> recovery is do-able and eventually catch up on recovery ! >> >> Best Regards >> CVP >> >> On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga < >> chakravarth...@gmail.com> wrote: >> >>> Thanks for your prompt response Stephan. >>> >>> I'd wait for Flink 1.1.3 !!! >>> >>> Best Regards >>> Varaga >>> >>> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> The plan to release 1.1.3 is asap ;-) >>>> >>>> Waiting for last backported patched to get in, then release testing and >>>> release. >>>> >>>> If you want to test it today, you would need to manually build the >>>> release-1.1 branch. >>>> >>>> Best, >>>> Stephan >>>> >>>> >>>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga < >>>> chakravarth...@gmail.com> wrote: >>>> >>>>> Hi Gordon, >>>>> >>>>> Do I need to clone and build release-1.1 branch to test this? >>>>> I currently use flinlk 1.1.2 runtime. When is the plan to release >>>>> it in 1.1.3? >>>>> >>>>> Best Regards >>>>> Varaga >>>>> >>>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai < >>>>> tzuli...@apache.org> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Helping out here: this is the PR for async Kafka offset committing - >>>>>> https://github.com/apache/flink/pull/2574. >>>>>> It has already been merged into the master and release-1.1 branches, >>>>>> so you can try out the changes now if you’d like. >>>>>> The change should also be included in the 1.1.3 release, which the >>>>>> Flink community is discussing to release soon. >>>>>> >>>>>> Will definitely be helpful if you can provide feedback afterwards! >>>>>> >>>>>> Best Regards, >>>>>> Gordon >>>>>> >>>>>> >>>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga ( >>>>>> chakravarth...@gmail.com) wrote: >>>>>> >>>>>> Hi Stephan, >>>>>> >>>>>> Is the Async kafka offset commit released in 1.3.1? >>>>>> >>>>>> Varaga >>>>>> >>>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga < >>>>>> chakravarth...@gmail.com> wrote: >>>>>> >>>>>>> Hi Stephan, >>>>>>> >>>>>>> That should be great. Let me know once the fix is done and the >>>>>>> snapshot version to use, I'll check and revert then. >>>>>>> Can you also share the JIRA that tracks the issue? >>>>>>> >>>>>>> With regards to offset commit issue, I'm not sure as to how to >>>>>>> proceed here. Probably I'll use your fix first and see if the problem >>>>>>> reoccurs. >>>>>>> >>>>>>> Thanks much >>>>>>> Varaga >>>>>>> >>>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> @CVP >>>>>>>> >>>>>>>> Flink stores in checkpoints in your case only the Kafka offsets >>>>>>>> (few bytes) and the custom state (e). >>>>>>>> >>>>>>>> Here is an illustration of the checkpoint and what is stored (from >>>>>>>> the Flink docs). >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter >>>>>>>> nals/stream_checkpointing.html >>>>>>>> >>>>>>>> >>>>>>>> I am quite puzzled why the offset committing problem occurs only >>>>>>>> for one input, and not for the other. >>>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well. >>>>>>>> Could you try out a snapshot version to see if that fixes your >>>>>>>> problem? >>>>>>>> >>>>>>>> Greetings, >>>>>>>> Stephan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga < >>>>>>>> chakravarth...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Stefan, >>>>>>>>> >>>>>>>>> Thanks a million for your detailed explanation. I appreciate >>>>>>>>> it. >>>>>>>>> >>>>>>>>> - The *zookeeper bundled with kafka 0.9.0.1* was used to >>>>>>>>> start zookeeper. There is only 1 instance (standalone) of zookeeper >>>>>>>>> running >>>>>>>>> on my localhost (ubuntu 14.04) >>>>>>>>> - There is only 1 Kafka broker (*version: 0.9.0.1* ) >>>>>>>>> >>>>>>>>> With regards to Flink cluster there's only 1 JM & 2 TMs >>>>>>>>> started with no HA. I presume this does not use zookeeper anyways as >>>>>>>>> it >>>>>>>>> runs as standalone cluster. >>>>>>>>> >>>>>>>>> >>>>>>>>> BTW., The kafka connector version that I use is as suggested >>>>>>>>> in the flink connectors page >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *. <dependency> >>>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> >>>>>>>>> <version>1.1.1</version> </dependency>* >>>>>>>>> >>>>>>>>> Do you see any issues with versions? >>>>>>>>> >>>>>>>>> 1) Do you have benchmarks wrt., to checkpointing in flink? >>>>>>>>> >>>>>>>>> 2) There isn't detailed explanation on what states are stored >>>>>>>>> as part of the checkpointing process. For ex., If I have pipeline >>>>>>>>> like >>>>>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's >>>>>>>>> stored is:* >>>>>>>>> >>>>>>>>> * a) The source stream's custom watermarked records* >>>>>>>>> >>>>>>>>> * b) Intermediate states of each of the transformations in >>>>>>>>> the pipeline* >>>>>>>>> >>>>>>>>> * c) Delta of Records stored from the previous sink* >>>>>>>>> >>>>>>>>> * d) Custom States (SayValueState as in my case) - >>>>>>>>> Essentially this is what I bother about storing.* >>>>>>>>> * e) All of my operators* >>>>>>>>> >>>>>>>>> Is my understanding right? >>>>>>>>> >>>>>>>>> 3) Is there a way in Flink to checkpoint only d) as stated >>>>>>>>> above >>>>>>>>> >>>>>>>>> 4) Can you apply checkpointing to only streams and certain >>>>>>>>> operators (say I wish to store aggregated values part of the >>>>>>>>> transformation) >>>>>>>>> >>>>>>>>> Best Regards >>>>>>>>> CVP >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks, the logs were very helpful! >>>>>>>>>> >>>>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and >>>>>>>>>> prevents proper starting of checkpoints. >>>>>>>>>> >>>>>>>>>> Here is what is happening in detail: >>>>>>>>>> >>>>>>>>>> - Between the point when the TaskManager receives the "trigger >>>>>>>>>> checkpoint" message and when the point when the KafkaSource actually >>>>>>>>>> starts >>>>>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka >>>>>>>>>> Inputs >>>>>>>>>> (the other is fine). >>>>>>>>>> - The only way this delayed can be introduced is if another >>>>>>>>>> checkpoint related operation (such as trigger() or notifyComplete() >>>>>>>>>> ) is >>>>>>>>>> still in progress when the checkpoint is started. Flink does not >>>>>>>>>> perform >>>>>>>>>> concurrent checkpoint operations on a single operator, to ease the >>>>>>>>>> concurrency model for users. >>>>>>>>>> - The operation that is still in progress must be the >>>>>>>>>> committing of the offsets (to ZooKeeper or Kafka). That also >>>>>>>>>> explains why >>>>>>>>>> this only happens once one side receives the first record. Before >>>>>>>>>> that, >>>>>>>>>> there is nothing to commit. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> What Flink should fix: >>>>>>>>>> - The KafkaConsumer should run the commit operations >>>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method. >>>>>>>>>> >>>>>>>>>> What you can fix: >>>>>>>>>> - Have a look at your Kafka/ZooKeeper setup. One Kafka Input >>>>>>>>>> works well, the other does not. Do they go against different sets of >>>>>>>>>> brokers, or different ZooKeepers? Is the metadata for one input bad? >>>>>>>>>> - In the next Flink version, you may opt-out of committing >>>>>>>>>> offsets to Kafka/ZooKeeper all together. It is not important for >>>>>>>>>> Flink's >>>>>>>>>> checkpoints anyways. >>>>>>>>>> >>>>>>>>>> Greetings, >>>>>>>>>> Stephan >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga < >>>>>>>>>> chakravarth...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Stefan, >>>>>>>>>>> >>>>>>>>>>> Please find my responses below. >>>>>>>>>>> >>>>>>>>>>> - What source are you using for the slow input? >>>>>>>>>>> * [CVP] - Both stream as pointed out in my first mail, are >>>>>>>>>>> Kafka Streams* >>>>>>>>>>> - How large is the state that you are checkpointing? >>>>>>>>>>> >>>>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment >>>>>>>>>>> as below.* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> * final StreamExecutionEnvironment streamEnv = >>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>> streamEnv.setStateBackend(new >>>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints")); >>>>>>>>>>> streamEnv.enableCheckpointing(10000); * >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> * In terms of the state stored, the KS1 stream has payload >>>>>>>>>>> of 100K events/second, while KS2 have about 1 event / 10 minutes... >>>>>>>>>>> basically the operators perform flatmaps on 8 fields of tuple (all >>>>>>>>>>> fields >>>>>>>>>>> are primitives). If you look at the states' sizes in dashboard they >>>>>>>>>>> are in >>>>>>>>>>> Kb... * >>>>>>>>>>> - Can you try to see in the log if actually the state snapshot >>>>>>>>>>> takes that long, or if it simply takes long for the checkpoint >>>>>>>>>>> barriers to travel through the stream due to a lot of backpressure? >>>>>>>>>>> [CVP] -There are no back pressure atleast from the sample >>>>>>>>>>> computation in the flink dashboard. 100K/second is low load for >>>>>>>>>>> flink's >>>>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I >>>>>>>>>>> have >>>>>>>>>>> attached the Task Manager log (DEBUG) info if that will interest >>>>>>>>>>> you. >>>>>>>>>>> >>>>>>>>>>> I have attached the checkpoints times' as .png from the >>>>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- >>>>>>>>>>> you'd >>>>>>>>>>> see that the checkpoints take more than a minute in each case. >>>>>>>>>>> Before these >>>>>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an >>>>>>>>>>> event(should be in bytes) was generated, the checkpoints went slow >>>>>>>>>>> and >>>>>>>>>>> subsequently a minute more for every checkpoint thereafter. >>>>>>>>>>> >>>>>>>>>>> This log was collected from the standalone flink cluster with >>>>>>>>>>> 1 job manager & 2 TMs. 1 TM was running this application with >>>>>>>>>>> checkpointing >>>>>>>>>>> (parallelism=1) >>>>>>>>>>> >>>>>>>>>>> Please let me know if you need further info., >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi! >>>>>>>>>>>> >>>>>>>>>>>> Let's try to figure that one out. Can you give us a bit more >>>>>>>>>>>> information? >>>>>>>>>>>> >>>>>>>>>>>> - What source are you using for the slow input? >>>>>>>>>>>> - How large is the state that you are checkpointing? >>>>>>>>>>>> - Can you try to see in the log if actually the state >>>>>>>>>>>> snapshot takes that long, or if it simply takes long for the >>>>>>>>>>>> checkpoint >>>>>>>>>>>> barriers to travel through the stream due to a lot of backpressure? >>>>>>>>>>>> >>>>>>>>>>>> Greetings, >>>>>>>>>>>> Stephan >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske < >>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi CVP, >>>>>>>>>>>>> >>>>>>>>>>>>> I'm not so much familiar with the internals of the >>>>>>>>>>>>> checkpointing system, but maybe Stephan (in CC) has an idea >>>>>>>>>>>>> what's going on >>>>>>>>>>>>> here. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>> >>>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga < >>>>>>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Aljoscha & Fabian, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have a stream application that has 2 stream source as >>>>>>>>>>>>>> below. >>>>>>>>>>>>>> >>>>>>>>>>>>>> KeyedStream<String, String> *ks1* = ds1.keyBy("*") ; >>>>>>>>>>>>>> KeyedStream<Tuple2<String, V>, String> *ks2* = >>>>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0); >>>>>>>>>>>>>> >>>>>>>>>>>>>> ks1.connect(ks2).flatMap(X); >>>>>>>>>>>>>> //X is a CoFlatMapFunction that inserts and removes >>>>>>>>>>>>>> elements from ks2 into a key-value state member. Elements from >>>>>>>>>>>>>> ks1 are >>>>>>>>>>>>>> matched against that state. the CoFlatMapFunction operator >>>>>>>>>>>>>> maintains >>>>>>>>>>>>>> ValueState<Tuple2<Long, Long>>; >>>>>>>>>>>>>> >>>>>>>>>>>>>> //ks1 is streaming about 100K events/sec from kafka topic >>>>>>>>>>>>>> //ks2 is streaming about 1 event every 10 minutes... >>>>>>>>>>>>>> Precisely when the 1st event is consumed from this stream, >>>>>>>>>>>>>> checkpoint takes >>>>>>>>>>>>>> 2 minutes straight away. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The version of flink is 1.1.2. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I tried to use checkpoint every 10 Secs using a >>>>>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration >>>>>>>>>>>>>> is almost 2 >>>>>>>>>>>>>> minutes for many cases, while for the other cases it varies from >>>>>>>>>>>>>> 100 ms to >>>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the >>>>>>>>>>>>>> dashboard for >>>>>>>>>>>>>> your reference. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is this an issue with flink checkpointing? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> CVP >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >