Hi CVP, we recently release Flink 1.1.4, i.e., the next bugfix release of the 1.1.x series with major robustness improvements [1]. You might want to give 1.1.4 a try as well.
Best, Fabian [1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html 2017-01-04 16:51 GMT+01:00 Chakravarthy varaga <chakravarth...@gmail.com>: > Hi Stephan, All, > > I just got a chance to try if 1.1.3 fixes slow check pointing on FS > backend. It seemed to have been fixed. Thanks for the fix. > > While testing this, with varying check point intervals, there seem to > be Spikes of slow checkpoints every 30/40 seconds for an interval of 15 > secs. The check point time lasts for about 300 ms as apposed to 10/20 ms. > Basically 15 secs seem to be the nominal value so far. anything below > this interval shoots the spikes too often. For us living with 15 sec > recovery is do-able and eventually catch up on recovery ! > > Best Regards > CVP > > On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga < > chakravarth...@gmail.com> wrote: > >> Thanks for your prompt response Stephan. >> >> I'd wait for Flink 1.1.3 !!! >> >> Best Regards >> Varaga >> >> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> The plan to release 1.1.3 is asap ;-) >>> >>> Waiting for last backported patched to get in, then release testing and >>> release. >>> >>> If you want to test it today, you would need to manually build the >>> release-1.1 branch. >>> >>> Best, >>> Stephan >>> >>> >>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga < >>> chakravarth...@gmail.com> wrote: >>> >>>> Hi Gordon, >>>> >>>> Do I need to clone and build release-1.1 branch to test this? >>>> I currently use flinlk 1.1.2 runtime. When is the plan to release >>>> it in 1.1.3? >>>> >>>> Best Regards >>>> Varaga >>>> >>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai < >>>> tzuli...@apache.org> wrote: >>>> >>>>> Hi, >>>>> >>>>> Helping out here: this is the PR for async Kafka offset committing - >>>>> https://github.com/apache/flink/pull/2574. >>>>> It has already been merged into the master and release-1.1 branches, >>>>> so you can try out the changes now if you’d like. >>>>> The change should also be included in the 1.1.3 release, which the >>>>> Flink community is discussing to release soon. >>>>> >>>>> Will definitely be helpful if you can provide feedback afterwards! >>>>> >>>>> Best Regards, >>>>> Gordon >>>>> >>>>> >>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga ( >>>>> chakravarth...@gmail.com) wrote: >>>>> >>>>> Hi Stephan, >>>>> >>>>> Is the Async kafka offset commit released in 1.3.1? >>>>> >>>>> Varaga >>>>> >>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga < >>>>> chakravarth...@gmail.com> wrote: >>>>> >>>>>> Hi Stephan, >>>>>> >>>>>> That should be great. Let me know once the fix is done and the >>>>>> snapshot version to use, I'll check and revert then. >>>>>> Can you also share the JIRA that tracks the issue? >>>>>> >>>>>> With regards to offset commit issue, I'm not sure as to how to >>>>>> proceed here. Probably I'll use your fix first and see if the problem >>>>>> reoccurs. >>>>>> >>>>>> Thanks much >>>>>> Varaga >>>>>> >>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> @CVP >>>>>>> >>>>>>> Flink stores in checkpoints in your case only the Kafka offsets (few >>>>>>> bytes) and the custom state (e). >>>>>>> >>>>>>> Here is an illustration of the checkpoint and what is stored (from >>>>>>> the Flink docs). >>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter >>>>>>> nals/stream_checkpointing.html >>>>>>> >>>>>>> >>>>>>> I am quite puzzled why the offset committing problem occurs only for >>>>>>> one input, and not for the other. >>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well. >>>>>>> Could you try out a snapshot version to see if that fixes your >>>>>>> problem? >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga < >>>>>>> chakravarth...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Stefan, >>>>>>>> >>>>>>>> Thanks a million for your detailed explanation. I appreciate >>>>>>>> it. >>>>>>>> >>>>>>>> - The *zookeeper bundled with kafka 0.9.0.1* was used to >>>>>>>> start zookeeper. There is only 1 instance (standalone) of zookeeper >>>>>>>> running >>>>>>>> on my localhost (ubuntu 14.04) >>>>>>>> - There is only 1 Kafka broker (*version: 0.9.0.1* ) >>>>>>>> >>>>>>>> With regards to Flink cluster there's only 1 JM & 2 TMs >>>>>>>> started with no HA. I presume this does not use zookeeper anyways as it >>>>>>>> runs as standalone cluster. >>>>>>>> >>>>>>>> >>>>>>>> BTW., The kafka connector version that I use is as suggested >>>>>>>> in the flink connectors page >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *. <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> >>>>>>>> <version>1.1.1</version> </dependency>* >>>>>>>> >>>>>>>> Do you see any issues with versions? >>>>>>>> >>>>>>>> 1) Do you have benchmarks wrt., to checkpointing in flink? >>>>>>>> >>>>>>>> 2) There isn't detailed explanation on what states are stored >>>>>>>> as part of the checkpointing process. For ex., If I have pipeline like >>>>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's >>>>>>>> stored is:* >>>>>>>> >>>>>>>> * a) The source stream's custom watermarked records* >>>>>>>> >>>>>>>> * b) Intermediate states of each of the transformations in >>>>>>>> the pipeline* >>>>>>>> >>>>>>>> * c) Delta of Records stored from the previous sink* >>>>>>>> >>>>>>>> * d) Custom States (SayValueState as in my case) - >>>>>>>> Essentially this is what I bother about storing.* >>>>>>>> * e) All of my operators* >>>>>>>> >>>>>>>> Is my understanding right? >>>>>>>> >>>>>>>> 3) Is there a way in Flink to checkpoint only d) as stated >>>>>>>> above >>>>>>>> >>>>>>>> 4) Can you apply checkpointing to only streams and certain >>>>>>>> operators (say I wish to store aggregated values part of the >>>>>>>> transformation) >>>>>>>> >>>>>>>> Best Regards >>>>>>>> CVP >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks, the logs were very helpful! >>>>>>>>> >>>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and >>>>>>>>> prevents proper starting of checkpoints. >>>>>>>>> >>>>>>>>> Here is what is happening in detail: >>>>>>>>> >>>>>>>>> - Between the point when the TaskManager receives the "trigger >>>>>>>>> checkpoint" message and when the point when the KafkaSource actually >>>>>>>>> starts >>>>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka >>>>>>>>> Inputs >>>>>>>>> (the other is fine). >>>>>>>>> - The only way this delayed can be introduced is if another >>>>>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) >>>>>>>>> is >>>>>>>>> still in progress when the checkpoint is started. Flink does not >>>>>>>>> perform >>>>>>>>> concurrent checkpoint operations on a single operator, to ease the >>>>>>>>> concurrency model for users. >>>>>>>>> - The operation that is still in progress must be the committing >>>>>>>>> of the offsets (to ZooKeeper or Kafka). That also explains why this >>>>>>>>> only >>>>>>>>> happens once one side receives the first record. Before that, there is >>>>>>>>> nothing to commit. >>>>>>>>> >>>>>>>>> >>>>>>>>> What Flink should fix: >>>>>>>>> - The KafkaConsumer should run the commit operations >>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method. >>>>>>>>> >>>>>>>>> What you can fix: >>>>>>>>> - Have a look at your Kafka/ZooKeeper setup. One Kafka Input >>>>>>>>> works well, the other does not. Do they go against different sets of >>>>>>>>> brokers, or different ZooKeepers? Is the metadata for one input bad? >>>>>>>>> - In the next Flink version, you may opt-out of committing >>>>>>>>> offsets to Kafka/ZooKeeper all together. It is not important for >>>>>>>>> Flink's >>>>>>>>> checkpoints anyways. >>>>>>>>> >>>>>>>>> Greetings, >>>>>>>>> Stephan >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga < >>>>>>>>> chakravarth...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Stefan, >>>>>>>>>> >>>>>>>>>> Please find my responses below. >>>>>>>>>> >>>>>>>>>> - What source are you using for the slow input? >>>>>>>>>> * [CVP] - Both stream as pointed out in my first mail, are >>>>>>>>>> Kafka Streams* >>>>>>>>>> - How large is the state that you are checkpointing? >>>>>>>>>> >>>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as >>>>>>>>>> below.* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> * final StreamExecutionEnvironment streamEnv = >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>> streamEnv.setStateBackend(new >>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints")); >>>>>>>>>> streamEnv.enableCheckpointing(10000); * >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> * In terms of the state stored, the KS1 stream has payload >>>>>>>>>> of 100K events/second, while KS2 have about 1 event / 10 minutes... >>>>>>>>>> basically the operators perform flatmaps on 8 fields of tuple (all >>>>>>>>>> fields >>>>>>>>>> are primitives). If you look at the states' sizes in dashboard they >>>>>>>>>> are in >>>>>>>>>> Kb... * >>>>>>>>>> - Can you try to see in the log if actually the state snapshot >>>>>>>>>> takes that long, or if it simply takes long for the checkpoint >>>>>>>>>> barriers to travel through the stream due to a lot of backpressure? >>>>>>>>>> [CVP] -There are no back pressure atleast from the sample >>>>>>>>>> computation in the flink dashboard. 100K/second is low load for >>>>>>>>>> flink's >>>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I >>>>>>>>>> have >>>>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you. >>>>>>>>>> >>>>>>>>>> I have attached the checkpoints times' as .png from the >>>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd >>>>>>>>>> see that the checkpoints take more than a minute in each case. >>>>>>>>>> Before these >>>>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an >>>>>>>>>> event(should be in bytes) was generated, the checkpoints went slow >>>>>>>>>> and >>>>>>>>>> subsequently a minute more for every checkpoint thereafter. >>>>>>>>>> >>>>>>>>>> This log was collected from the standalone flink cluster with >>>>>>>>>> 1 job manager & 2 TMs. 1 TM was running this application with >>>>>>>>>> checkpointing >>>>>>>>>> (parallelism=1) >>>>>>>>>> >>>>>>>>>> Please let me know if you need further info., >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi! >>>>>>>>>>> >>>>>>>>>>> Let's try to figure that one out. Can you give us a bit more >>>>>>>>>>> information? >>>>>>>>>>> >>>>>>>>>>> - What source are you using for the slow input? >>>>>>>>>>> - How large is the state that you are checkpointing? >>>>>>>>>>> - Can you try to see in the log if actually the state snapshot >>>>>>>>>>> takes that long, or if it simply takes long for the checkpoint >>>>>>>>>>> barriers to >>>>>>>>>>> travel through the stream due to a lot of backpressure? >>>>>>>>>>> >>>>>>>>>>> Greetings, >>>>>>>>>>> Stephan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske < >>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi CVP, >>>>>>>>>>>> >>>>>>>>>>>> I'm not so much familiar with the internals of the >>>>>>>>>>>> checkpointing system, but maybe Stephan (in CC) has an idea what's >>>>>>>>>>>> going on >>>>>>>>>>>> here. >>>>>>>>>>>> >>>>>>>>>>>> Best, Fabian >>>>>>>>>>>> >>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga < >>>>>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Aljoscha & Fabian, >>>>>>>>>>>>> >>>>>>>>>>>>> I have a stream application that has 2 stream source as >>>>>>>>>>>>> below. >>>>>>>>>>>>> >>>>>>>>>>>>> KeyedStream<String, String> *ks1* = ds1.keyBy("*") ; >>>>>>>>>>>>> KeyedStream<Tuple2<String, V>, String> *ks2* = >>>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0); >>>>>>>>>>>>> >>>>>>>>>>>>> ks1.connect(ks2).flatMap(X); >>>>>>>>>>>>> //X is a CoFlatMapFunction that inserts and removes >>>>>>>>>>>>> elements from ks2 into a key-value state member. Elements from >>>>>>>>>>>>> ks1 are >>>>>>>>>>>>> matched against that state. the CoFlatMapFunction operator >>>>>>>>>>>>> maintains >>>>>>>>>>>>> ValueState<Tuple2<Long, Long>>; >>>>>>>>>>>>> >>>>>>>>>>>>> //ks1 is streaming about 100K events/sec from kafka topic >>>>>>>>>>>>> //ks2 is streaming about 1 event every 10 minutes... >>>>>>>>>>>>> Precisely when the 1st event is consumed from this stream, >>>>>>>>>>>>> checkpoint takes >>>>>>>>>>>>> 2 minutes straight away. >>>>>>>>>>>>> >>>>>>>>>>>>> The version of flink is 1.1.2. >>>>>>>>>>>>> >>>>>>>>>>>>> I tried to use checkpoint every 10 Secs using a >>>>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration >>>>>>>>>>>>> is almost 2 >>>>>>>>>>>>> minutes for many cases, while for the other cases it varies from >>>>>>>>>>>>> 100 ms to >>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the >>>>>>>>>>>>> dashboard for >>>>>>>>>>>>> your reference. >>>>>>>>>>>>> >>>>>>>>>>>>> Is this an issue with flink checkpointing? >>>>>>>>>>>>> >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> CVP >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >