BRILLIANT !!!

Checkpoint times are consistent with 1.1.4...

Thanks for your formidable support !

Best Regards
CVP

On Wed, Jan 4, 2017 at 5:33 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi CVP,
>
> we recently release Flink 1.1.4, i.e., the next bugfix release of the
> 1.1.x series with major robustness improvements [1].
> You might want to give 1.1.4 a try as well.
>
> Best, Fabian
>
> [1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html
>
> 2017-01-04 16:51 GMT+01:00 Chakravarthy varaga <chakravarth...@gmail.com>:
>
>> Hi Stephan, All,
>>
>>      I just got a chance to try if 1.1.3 fixes slow check pointing on FS
>> backend. It seemed to have been fixed. Thanks for the fix.
>>
>>      While testing this, with varying check point intervals, there seem
>> to be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
>> secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
>>      Basically 15 secs seem to be the nominal value so far. anything
>> below this interval shoots the spikes too often. For us living with 15 sec
>> recovery is do-able and eventually catch up on recovery !
>>
>> Best Regards
>> CVP
>>
>> On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>> Thanks for your prompt response Stephan.
>>>
>>>     I'd wait for Flink 1.1.3 !!!
>>>
>>> Best Regards
>>> Varaga
>>>
>>> On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> The plan to release 1.1.3 is asap ;-)
>>>>
>>>> Waiting for last backported patched to get in, then release testing and
>>>> release.
>>>>
>>>> If you want to test it today, you would need to manually build the
>>>> release-1.1 branch.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
>>>> chakravarth...@gmail.com> wrote:
>>>>
>>>>> Hi Gordon,
>>>>>
>>>>>      Do I need to clone and build release-1.1 branch to test this?
>>>>>      I currently use flinlk 1.1.2 runtime. When is the plan to release
>>>>> it in 1.1.3?
>>>>>
>>>>> Best Regards
>>>>> Varaga
>>>>>
>>>>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <
>>>>> tzuli...@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Helping out here: this is the PR for async Kafka offset committing -
>>>>>> https://github.com/apache/flink/pull/2574.
>>>>>> It has already been merged into the master and release-1.1 branches,
>>>>>> so you can try out the changes now if you’d like.
>>>>>> The change should also be included in the 1.1.3 release, which the
>>>>>> Flink community is discussing to release soon.
>>>>>>
>>>>>> Will definitely be helpful if you can provide feedback afterwards!
>>>>>>
>>>>>> Best Regards,
>>>>>> Gordon
>>>>>>
>>>>>>
>>>>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>>>>> chakravarth...@gmail.com) wrote:
>>>>>>
>>>>>> Hi Stephan,
>>>>>>
>>>>>>     Is the Async kafka offset commit released in 1.3.1?
>>>>>>
>>>>>> Varaga
>>>>>>
>>>>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>>>>> chakravarth...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>>      That should be great. Let me know once the fix is done and the
>>>>>>> snapshot version to use, I'll check and revert then.
>>>>>>>      Can you also share the JIRA that tracks the issue?
>>>>>>>
>>>>>>>      With regards to offset commit issue, I'm not sure as to how to
>>>>>>> proceed here. Probably I'll use your fix first and see if the problem
>>>>>>> reoccurs.
>>>>>>>
>>>>>>> Thanks much
>>>>>>> Varaga
>>>>>>>
>>>>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> @CVP
>>>>>>>>
>>>>>>>> Flink stores in checkpoints in your case only the Kafka offsets
>>>>>>>> (few bytes) and the custom state (e).
>>>>>>>>
>>>>>>>> Here is an illustration of the checkpoint and what is stored (from
>>>>>>>> the Flink docs).
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>>>>> nals/stream_checkpointing.html
>>>>>>>>
>>>>>>>>
>>>>>>>> I am quite puzzled why the offset committing problem occurs only
>>>>>>>> for one input, and not for the other.
>>>>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>>>>>> Could you try out a snapshot version to see if that fixes your
>>>>>>>> problem?
>>>>>>>>
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>>>>> chakravarth...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Stefan,
>>>>>>>>>
>>>>>>>>>      Thanks a million for your detailed explanation. I appreciate
>>>>>>>>> it.
>>>>>>>>>
>>>>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to
>>>>>>>>> start zookeeper. There is only 1 instance (standalone) of zookeeper 
>>>>>>>>> running
>>>>>>>>> on my localhost (ubuntu 14.04)
>>>>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>>>>>
>>>>>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs
>>>>>>>>> started with no HA. I presume this does not use zookeeper anyways as 
>>>>>>>>> it
>>>>>>>>> runs as standalone cluster.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>      BTW., The kafka connector version that I use is as suggested
>>>>>>>>> in the flink connectors page
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *.        <dependency>
>>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>>>>
>>>>>>>>>      Do you see any issues with versions?
>>>>>>>>>
>>>>>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>>>>>
>>>>>>>>>      2) There isn't detailed explanation on what states are stored
>>>>>>>>> as part of the checkpointing process. For ex.,  If I have pipeline 
>>>>>>>>> like
>>>>>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's
>>>>>>>>> stored is:*
>>>>>>>>>
>>>>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>>>>
>>>>>>>>> *         b) Intermediate states of each of the transformations in
>>>>>>>>> the pipeline*
>>>>>>>>>
>>>>>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>>>>>
>>>>>>>>> *         d) Custom States (SayValueState as in my case) -
>>>>>>>>> Essentially this is what I bother about storing.*
>>>>>>>>> *         e) All of my operators*
>>>>>>>>>
>>>>>>>>>       Is my understanding right?
>>>>>>>>>
>>>>>>>>>      3) Is there a way in Flink to checkpoint only d) as stated
>>>>>>>>> above
>>>>>>>>>
>>>>>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>>>>>> operators (say I wish to store aggregated values part of the 
>>>>>>>>> transformation)
>>>>>>>>>
>>>>>>>>> Best Regards
>>>>>>>>> CVP
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks, the logs were very helpful!
>>>>>>>>>>
>>>>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and
>>>>>>>>>> prevents proper starting of checkpoints.
>>>>>>>>>>
>>>>>>>>>> Here is what is happening in detail:
>>>>>>>>>>
>>>>>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>>>>>> checkpoint" message and when the point when the KafkaSource actually 
>>>>>>>>>> starts
>>>>>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka 
>>>>>>>>>> Inputs
>>>>>>>>>> (the other is fine).
>>>>>>>>>>   - The only way this delayed can be introduced is if another
>>>>>>>>>> checkpoint related operation (such as trigger() or notifyComplete() 
>>>>>>>>>> ) is
>>>>>>>>>> still in progress when the checkpoint is started. Flink does not 
>>>>>>>>>> perform
>>>>>>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>>>>>>> concurrency model for users.
>>>>>>>>>>   - The operation that is still in progress must be the
>>>>>>>>>> committing of the offsets (to ZooKeeper or Kafka). That also 
>>>>>>>>>> explains why
>>>>>>>>>> this only happens once one side receives the first record. Before 
>>>>>>>>>> that,
>>>>>>>>>> there is nothing to commit.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What Flink should fix:
>>>>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method.
>>>>>>>>>>
>>>>>>>>>> What you can fix:
>>>>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input
>>>>>>>>>> works well, the other does not. Do they go against different sets of
>>>>>>>>>> brokers, or different ZooKeepers? Is the metadata for one input bad?
>>>>>>>>>>   - In the next Flink version, you may opt-out of committing
>>>>>>>>>> offsets to Kafka/ZooKeeper all together. It is not important for 
>>>>>>>>>> Flink's
>>>>>>>>>> checkpoints anyways.
>>>>>>>>>>
>>>>>>>>>> Greetings,
>>>>>>>>>> Stephan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>>>>>> chakravarth...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>
>>>>>>>>>>>     Please find my responses below.
>>>>>>>>>>>
>>>>>>>>>>>     - What source are you using for the slow input?
>>>>>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are
>>>>>>>>>>> Kafka Streams*
>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>
>>>>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment
>>>>>>>>>>> as below.*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *      In terms of the state stored, the KS1 stream has payload
>>>>>>>>>>> of 100K events/second, while KS2 have about 1 event / 10 minutes...
>>>>>>>>>>> basically the operators perform flatmaps on 8 fields of tuple (all 
>>>>>>>>>>> fields
>>>>>>>>>>> are primitives). If you look at the states' sizes in dashboard they 
>>>>>>>>>>> are in
>>>>>>>>>>> Kb... *
>>>>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>>>>>> computation in the flink dashboard. 100K/second is low load for 
>>>>>>>>>>> flink's
>>>>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I 
>>>>>>>>>>> have
>>>>>>>>>>> attached the Task Manager log (DEBUG) info if that will interest 
>>>>>>>>>>> you.
>>>>>>>>>>>
>>>>>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- 
>>>>>>>>>>> you'd
>>>>>>>>>>> see that the checkpoints take more than a minute in each case. 
>>>>>>>>>>> Before these
>>>>>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>>>>>>> event(should be in bytes) was generated, the checkpoints went slow 
>>>>>>>>>>> and
>>>>>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>>>>>
>>>>>>>>>>>    This log was collected from the standalone flink cluster with
>>>>>>>>>>> 1 job manager & 2 TMs. 1 TM was running this application with 
>>>>>>>>>>> checkpointing
>>>>>>>>>>> (parallelism=1)
>>>>>>>>>>>
>>>>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi!
>>>>>>>>>>>>
>>>>>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>>>>>> information?
>>>>>>>>>>>>
>>>>>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>>>>   - Can you try to see in the log if actually the state
>>>>>>>>>>>> snapshot takes that long, or if it simply takes long for the 
>>>>>>>>>>>> checkpoint
>>>>>>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>>>>>>
>>>>>>>>>>>> Greetings,
>>>>>>>>>>>> Stephan
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <
>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi CVP,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm not so much familiar with the internals of the
>>>>>>>>>>>>> checkpointing system, but maybe Stephan (in CC) has an idea 
>>>>>>>>>>>>> what's going on
>>>>>>>>>>>>> here.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     I have a stream application that has 2 stream source as
>>>>>>>>>>>>>> below.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes
>>>>>>>>>>>>>> elements from ks2 into a key-value state member. Elements from 
>>>>>>>>>>>>>> ks1 are
>>>>>>>>>>>>>> matched against that state. the CoFlatMapFunction operator 
>>>>>>>>>>>>>> maintains
>>>>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>>>>>> Precisely when the 1st event is consumed from this stream, 
>>>>>>>>>>>>>> checkpoint takes
>>>>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I tried to use checkpoint every 10 Secs using a
>>>>>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration 
>>>>>>>>>>>>>> is almost 2
>>>>>>>>>>>>>> minutes for many cases, while for the other cases it varies from 
>>>>>>>>>>>>>> 100 ms to
>>>>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the 
>>>>>>>>>>>>>> dashboard for
>>>>>>>>>>>>>> your reference.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Best Regards
>>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to