Thanks for your prompt response Stephan. I'd wait for Flink 1.1.3 !!!
Best Regards Varaga On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <se...@apache.org> wrote: > The plan to release 1.1.3 is asap ;-) > > Waiting for last backported patched to get in, then release testing and > release. > > If you want to test it today, you would need to manually build the > release-1.1 branch. > > Best, > Stephan > > > On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga < > chakravarth...@gmail.com> wrote: > >> Hi Gordon, >> >> Do I need to clone and build release-1.1 branch to test this? >> I currently use flinlk 1.1.2 runtime. When is the plan to release it >> in 1.1.3? >> >> Best Regards >> Varaga >> >> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> >> wrote: >> >>> Hi, >>> >>> Helping out here: this is the PR for async Kafka offset committing - >>> https://github.com/apache/flink/pull/2574. >>> It has already been merged into the master and release-1.1 branches, so >>> you can try out the changes now if you’d like. >>> The change should also be included in the 1.1.3 release, which the Flink >>> community is discussing to release soon. >>> >>> Will definitely be helpful if you can provide feedback afterwards! >>> >>> Best Regards, >>> Gordon >>> >>> >>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga ( >>> chakravarth...@gmail.com) wrote: >>> >>> Hi Stephan, >>> >>> Is the Async kafka offset commit released in 1.3.1? >>> >>> Varaga >>> >>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga < >>> chakravarth...@gmail.com> wrote: >>> >>>> Hi Stephan, >>>> >>>> That should be great. Let me know once the fix is done and the >>>> snapshot version to use, I'll check and revert then. >>>> Can you also share the JIRA that tracks the issue? >>>> >>>> With regards to offset commit issue, I'm not sure as to how to >>>> proceed here. Probably I'll use your fix first and see if the problem >>>> reoccurs. >>>> >>>> Thanks much >>>> Varaga >>>> >>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote: >>>> >>>>> @CVP >>>>> >>>>> Flink stores in checkpoints in your case only the Kafka offsets (few >>>>> bytes) and the custom state (e). >>>>> >>>>> Here is an illustration of the checkpoint and what is stored (from the >>>>> Flink docs). >>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter >>>>> nals/stream_checkpointing.html >>>>> >>>>> >>>>> I am quite puzzled why the offset committing problem occurs only for >>>>> one input, and not for the other. >>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well. >>>>> Could you try out a snapshot version to see if that fixes your problem? >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> >>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga < >>>>> chakravarth...@gmail.com> wrote: >>>>> >>>>>> Hi Stefan, >>>>>> >>>>>> Thanks a million for your detailed explanation. I appreciate it. >>>>>> >>>>>> - The *zookeeper bundled with kafka 0.9.0.1* was used to start >>>>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on >>>>>> my >>>>>> localhost (ubuntu 14.04) >>>>>> - There is only 1 Kafka broker (*version: 0.9.0.1* ) >>>>>> >>>>>> With regards to Flink cluster there's only 1 JM & 2 TMs started >>>>>> with no HA. I presume this does not use zookeeper anyways as it runs as >>>>>> standalone cluster. >>>>>> >>>>>> >>>>>> BTW., The kafka connector version that I use is as suggested in >>>>>> the flink connectors page >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> *. <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> >>>>>> <version>1.1.1</version> </dependency>* >>>>>> >>>>>> Do you see any issues with versions? >>>>>> >>>>>> 1) Do you have benchmarks wrt., to checkpointing in flink? >>>>>> >>>>>> 2) There isn't detailed explanation on what states are stored as >>>>>> part of the checkpointing process. For ex., If I have pipeline like >>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's >>>>>> stored is:* >>>>>> >>>>>> * a) The source stream's custom watermarked records* >>>>>> >>>>>> * b) Intermediate states of each of the transformations in >>>>>> the pipeline* >>>>>> >>>>>> * c) Delta of Records stored from the previous sink* >>>>>> >>>>>> * d) Custom States (SayValueState as in my case) - >>>>>> Essentially this is what I bother about storing.* >>>>>> * e) All of my operators* >>>>>> >>>>>> Is my understanding right? >>>>>> >>>>>> 3) Is there a way in Flink to checkpoint only d) as stated above >>>>>> >>>>>> 4) Can you apply checkpointing to only streams and certain >>>>>> operators (say I wish to store aggregated values part of the >>>>>> transformation) >>>>>> >>>>>> Best Regards >>>>>> CVP >>>>>> >>>>>> >>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Thanks, the logs were very helpful! >>>>>>> >>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents >>>>>>> proper starting of checkpoints. >>>>>>> >>>>>>> Here is what is happening in detail: >>>>>>> >>>>>>> - Between the point when the TaskManager receives the "trigger >>>>>>> checkpoint" message and when the point when the KafkaSource actually >>>>>>> starts >>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka >>>>>>> Inputs >>>>>>> (the other is fine). >>>>>>> - The only way this delayed can be introduced is if another >>>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is >>>>>>> still in progress when the checkpoint is started. Flink does not perform >>>>>>> concurrent checkpoint operations on a single operator, to ease the >>>>>>> concurrency model for users. >>>>>>> - The operation that is still in progress must be the committing >>>>>>> of the offsets (to ZooKeeper or Kafka). That also explains why this only >>>>>>> happens once one side receives the first record. Before that, there is >>>>>>> nothing to commit. >>>>>>> >>>>>>> >>>>>>> What Flink should fix: >>>>>>> - The KafkaConsumer should run the commit operations >>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method. >>>>>>> >>>>>>> What you can fix: >>>>>>> - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works >>>>>>> well, the other does not. Do they go against different sets of brokers, >>>>>>> or >>>>>>> different ZooKeepers? Is the metadata for one input bad? >>>>>>> - In the next Flink version, you may opt-out of committing offsets >>>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's >>>>>>> checkpoints anyways. >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga < >>>>>>> chakravarth...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Stefan, >>>>>>>> >>>>>>>> Please find my responses below. >>>>>>>> >>>>>>>> - What source are you using for the slow input? >>>>>>>> * [CVP] - Both stream as pointed out in my first mail, are >>>>>>>> Kafka Streams* >>>>>>>> - How large is the state that you are checkpointing? >>>>>>>> >>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as >>>>>>>> below.* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> * final StreamExecutionEnvironment streamEnv = >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> streamEnv.setStateBackend(new >>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints")); >>>>>>>> streamEnv.enableCheckpointing(10000); * >>>>>>>> >>>>>>>> >>>>>>>> * In terms of the state stored, the KS1 stream has payload of >>>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... >>>>>>>> basically >>>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are >>>>>>>> primitives). If you look at the states' sizes in dashboard they are in >>>>>>>> Kb... * >>>>>>>> - Can you try to see in the log if actually the state snapshot >>>>>>>> takes that long, or if it simply takes long for the checkpoint >>>>>>>> barriers to travel through the stream due to a lot of backpressure? >>>>>>>> [CVP] -There are no back pressure atleast from the sample >>>>>>>> computation in the flink dashboard. 100K/second is low load for flink's >>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I >>>>>>>> have >>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you. >>>>>>>> >>>>>>>> I have attached the checkpoints times' as .png from the >>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd >>>>>>>> see that the checkpoints take more than a minute in each case. Before >>>>>>>> these >>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an >>>>>>>> event(should be in bytes) was generated, the checkpoints went slow and >>>>>>>> subsequently a minute more for every checkpoint thereafter. >>>>>>>> >>>>>>>> This log was collected from the standalone flink cluster with 1 >>>>>>>> job manager & 2 TMs. 1 TM was running this application with >>>>>>>> checkpointing >>>>>>>> (parallelism=1) >>>>>>>> >>>>>>>> Please let me know if you need further info., >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi! >>>>>>>>> >>>>>>>>> Let's try to figure that one out. Can you give us a bit more >>>>>>>>> information? >>>>>>>>> >>>>>>>>> - What source are you using for the slow input? >>>>>>>>> - How large is the state that you are checkpointing? >>>>>>>>> - Can you try to see in the log if actually the state snapshot >>>>>>>>> takes that long, or if it simply takes long for the checkpoint >>>>>>>>> barriers to >>>>>>>>> travel through the stream due to a lot of backpressure? >>>>>>>>> >>>>>>>>> Greetings, >>>>>>>>> Stephan >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi CVP, >>>>>>>>>> >>>>>>>>>> I'm not so much familiar with the internals of the checkpointing >>>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here. >>>>>>>>>> >>>>>>>>>> Best, Fabian >>>>>>>>>> >>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga < >>>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>>> >>>>>>>>>>> Hi Aljoscha & Fabian, >>>>>>>>>>> >>>>>>>>>>> I have a stream application that has 2 stream source as >>>>>>>>>>> below. >>>>>>>>>>> >>>>>>>>>>> KeyedStream<String, String> *ks1* = ds1.keyBy("*") ; >>>>>>>>>>> KeyedStream<Tuple2<String, V>, String> *ks2* = >>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0); >>>>>>>>>>> >>>>>>>>>>> ks1.connect(ks2).flatMap(X); >>>>>>>>>>> //X is a CoFlatMapFunction that inserts and removes >>>>>>>>>>> elements from ks2 into a key-value state member. Elements from ks1 >>>>>>>>>>> are >>>>>>>>>>> matched against that state. the CoFlatMapFunction operator maintains >>>>>>>>>>> ValueState<Tuple2<Long, Long>>; >>>>>>>>>>> >>>>>>>>>>> //ks1 is streaming about 100K events/sec from kafka topic >>>>>>>>>>> //ks2 is streaming about 1 event every 10 minutes... >>>>>>>>>>> Precisely when the 1st event is consumed from this stream, >>>>>>>>>>> checkpoint takes >>>>>>>>>>> 2 minutes straight away. >>>>>>>>>>> >>>>>>>>>>> The version of flink is 1.1.2. >>>>>>>>>>> >>>>>>>>>>> I tried to use checkpoint every 10 Secs using a >>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration is >>>>>>>>>>> almost 2 >>>>>>>>>>> minutes for many cases, while for the other cases it varies from >>>>>>>>>>> 100 ms to >>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the dashboard >>>>>>>>>>> for >>>>>>>>>>> your reference. >>>>>>>>>>> >>>>>>>>>>> Is this an issue with flink checkpointing? >>>>>>>>>>> >>>>>>>>>>> Best Regards >>>>>>>>>>> CVP >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >