The plan to release 1.1.3 is asap ;-) Waiting for last backported patched to get in, then release testing and release.
If you want to test it today, you would need to manually build the release-1.1 branch. Best, Stephan On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga < chakravarth...@gmail.com> wrote: > Hi Gordon, > > Do I need to clone and build release-1.1 branch to test this? > I currently use flinlk 1.1.2 runtime. When is the plan to release it > in 1.1.3? > > Best Regards > Varaga > > On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi, >> >> Helping out here: this is the PR for async Kafka offset committing - >> https://github.com/apache/flink/pull/2574. >> It has already been merged into the master and release-1.1 branches, so >> you can try out the changes now if you’d like. >> The change should also be included in the 1.1.3 release, which the Flink >> community is discussing to release soon. >> >> Will definitely be helpful if you can provide feedback afterwards! >> >> Best Regards, >> Gordon >> >> >> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga ( >> chakravarth...@gmail.com) wrote: >> >> Hi Stephan, >> >> Is the Async kafka offset commit released in 1.3.1? >> >> Varaga >> >> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga < >> chakravarth...@gmail.com> wrote: >> >>> Hi Stephan, >>> >>> That should be great. Let me know once the fix is done and the >>> snapshot version to use, I'll check and revert then. >>> Can you also share the JIRA that tracks the issue? >>> >>> With regards to offset commit issue, I'm not sure as to how to >>> proceed here. Probably I'll use your fix first and see if the problem >>> reoccurs. >>> >>> Thanks much >>> Varaga >>> >>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> @CVP >>>> >>>> Flink stores in checkpoints in your case only the Kafka offsets (few >>>> bytes) and the custom state (e). >>>> >>>> Here is an illustration of the checkpoint and what is stored (from the >>>> Flink docs). >>>> https://ci.apache.org/projects/flink/flink-docs-master/inter >>>> nals/stream_checkpointing.html >>>> >>>> >>>> I am quite puzzled why the offset committing problem occurs only for >>>> one input, and not for the other. >>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well. >>>> Could you try out a snapshot version to see if that fixes your problem? >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> >>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga < >>>> chakravarth...@gmail.com> wrote: >>>> >>>>> Hi Stefan, >>>>> >>>>> Thanks a million for your detailed explanation. I appreciate it. >>>>> >>>>> - The *zookeeper bundled with kafka 0.9.0.1* was used to start >>>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on >>>>> my >>>>> localhost (ubuntu 14.04) >>>>> - There is only 1 Kafka broker (*version: 0.9.0.1* ) >>>>> >>>>> With regards to Flink cluster there's only 1 JM & 2 TMs started >>>>> with no HA. I presume this does not use zookeeper anyways as it runs as >>>>> standalone cluster. >>>>> >>>>> >>>>> BTW., The kafka connector version that I use is as suggested in >>>>> the flink connectors page >>>>> >>>>> >>>>> >>>>> >>>>> *. <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> >>>>> <version>1.1.1</version> </dependency>* >>>>> >>>>> Do you see any issues with versions? >>>>> >>>>> 1) Do you have benchmarks wrt., to checkpointing in flink? >>>>> >>>>> 2) There isn't detailed explanation on what states are stored as >>>>> part of the checkpointing process. For ex., If I have pipeline like >>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's stored >>>>> is:* >>>>> >>>>> * a) The source stream's custom watermarked records* >>>>> >>>>> * b) Intermediate states of each of the transformations in the >>>>> pipeline* >>>>> >>>>> * c) Delta of Records stored from the previous sink* >>>>> >>>>> * d) Custom States (SayValueState as in my case) - Essentially >>>>> this is what I bother about storing.* >>>>> * e) All of my operators* >>>>> >>>>> Is my understanding right? >>>>> >>>>> 3) Is there a way in Flink to checkpoint only d) as stated above >>>>> >>>>> 4) Can you apply checkpointing to only streams and certain >>>>> operators (say I wish to store aggregated values part of the >>>>> transformation) >>>>> >>>>> Best Regards >>>>> CVP >>>>> >>>>> >>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> >>>>> wrote: >>>>> >>>>>> Thanks, the logs were very helpful! >>>>>> >>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents >>>>>> proper starting of checkpoints. >>>>>> >>>>>> Here is what is happening in detail: >>>>>> >>>>>> - Between the point when the TaskManager receives the "trigger >>>>>> checkpoint" message and when the point when the KafkaSource actually >>>>>> starts >>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka >>>>>> Inputs >>>>>> (the other is fine). >>>>>> - The only way this delayed can be introduced is if another >>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is >>>>>> still in progress when the checkpoint is started. Flink does not perform >>>>>> concurrent checkpoint operations on a single operator, to ease the >>>>>> concurrency model for users. >>>>>> - The operation that is still in progress must be the committing of >>>>>> the offsets (to ZooKeeper or Kafka). That also explains why this only >>>>>> happens once one side receives the first record. Before that, there is >>>>>> nothing to commit. >>>>>> >>>>>> >>>>>> What Flink should fix: >>>>>> - The KafkaConsumer should run the commit operations >>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method. >>>>>> >>>>>> What you can fix: >>>>>> - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works >>>>>> well, the other does not. Do they go against different sets of brokers, >>>>>> or >>>>>> different ZooKeepers? Is the metadata for one input bad? >>>>>> - In the next Flink version, you may opt-out of committing offsets >>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's >>>>>> checkpoints anyways. >>>>>> >>>>>> Greetings, >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga < >>>>>> chakravarth...@gmail.com> wrote: >>>>>> >>>>>>> Hi Stefan, >>>>>>> >>>>>>> Please find my responses below. >>>>>>> >>>>>>> - What source are you using for the slow input? >>>>>>> * [CVP] - Both stream as pointed out in my first mail, are Kafka >>>>>>> Streams* >>>>>>> - How large is the state that you are checkpointing? >>>>>>> >>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as >>>>>>> below.* >>>>>>> >>>>>>> >>>>>>> >>>>>>> * final StreamExecutionEnvironment streamEnv = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>> streamEnv.setStateBackend(new >>>>>>> FsStateBackend("file:///tmp/flink/checkpoints")); >>>>>>> streamEnv.enableCheckpointing(10000); * >>>>>>> >>>>>>> >>>>>>> * In terms of the state stored, the KS1 stream has payload of >>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... >>>>>>> basically >>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are >>>>>>> primitives). If you look at the states' sizes in dashboard they are in >>>>>>> Kb... * >>>>>>> - Can you try to see in the log if actually the state snapshot >>>>>>> takes that long, or if it simply takes long for the checkpoint >>>>>>> barriers to travel through the stream due to a lot of backpressure? >>>>>>> [CVP] -There are no back pressure atleast from the sample >>>>>>> computation in the flink dashboard. 100K/second is low load for flink's >>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have >>>>>>> attached the Task Manager log (DEBUG) info if that will interest you. >>>>>>> >>>>>>> I have attached the checkpoints times' as .png from the >>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd >>>>>>> see that the checkpoints take more than a minute in each case. Before >>>>>>> these >>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an >>>>>>> event(should be in bytes) was generated, the checkpoints went slow and >>>>>>> subsequently a minute more for every checkpoint thereafter. >>>>>>> >>>>>>> This log was collected from the standalone flink cluster with 1 >>>>>>> job manager & 2 TMs. 1 TM was running this application with >>>>>>> checkpointing >>>>>>> (parallelism=1) >>>>>>> >>>>>>> Please let me know if you need further info., >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi! >>>>>>>> >>>>>>>> Let's try to figure that one out. Can you give us a bit more >>>>>>>> information? >>>>>>>> >>>>>>>> - What source are you using for the slow input? >>>>>>>> - How large is the state that you are checkpointing? >>>>>>>> - Can you try to see in the log if actually the state snapshot >>>>>>>> takes that long, or if it simply takes long for the checkpoint >>>>>>>> barriers to >>>>>>>> travel through the stream due to a lot of backpressure? >>>>>>>> >>>>>>>> Greetings, >>>>>>>> Stephan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi CVP, >>>>>>>>> >>>>>>>>> I'm not so much familiar with the internals of the checkpointing >>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here. >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga < >>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>> >>>>>>>>>> Hi Aljoscha & Fabian, >>>>>>>>>> >>>>>>>>>> I have a stream application that has 2 stream source as below. >>>>>>>>>> >>>>>>>>>> KeyedStream<String, String> *ks1* = ds1.keyBy("*") ; >>>>>>>>>> KeyedStream<Tuple2<String, V>, String> *ks2* = >>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0); >>>>>>>>>> >>>>>>>>>> ks1.connect(ks2).flatMap(X); >>>>>>>>>> //X is a CoFlatMapFunction that inserts and removes elements >>>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched >>>>>>>>>> against that state. the CoFlatMapFunction operator maintains >>>>>>>>>> ValueState<Tuple2<Long, Long>>; >>>>>>>>>> >>>>>>>>>> //ks1 is streaming about 100K events/sec from kafka topic >>>>>>>>>> //ks2 is streaming about 1 event every 10 minutes... >>>>>>>>>> Precisely when the 1st event is consumed from this stream, >>>>>>>>>> checkpoint takes >>>>>>>>>> 2 minutes straight away. >>>>>>>>>> >>>>>>>>>> The version of flink is 1.1.2. >>>>>>>>>> >>>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend... >>>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes >>>>>>>>>> for many >>>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes >>>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your >>>>>>>>>> reference. >>>>>>>>>> >>>>>>>>>> Is this an issue with flink checkpointing? >>>>>>>>>> >>>>>>>>>> Best Regards >>>>>>>>>> CVP >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >