Thanks for your prompt response Stephan.

    I'd wait for Flink 1.1.3 !!!

Best Regards
Varaga

On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <se...@apache.org> wrote:

> The plan to release 1.1.3 is asap ;-)
>
> Waiting for last backported patched to get in, then release testing and
> release.
>
> If you want to test it today, you would need to manually build the
> release-1.1 branch.
>
> Best,
> Stephan
>
>
> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
> chakravarth...@gmail.com> wrote:
>
>> Hi Gordon,
>>
>>      Do I need to clone and build release-1.1 branch to test this?
>>      I currently use flinlk 1.1.2 runtime. When is the plan to release it
>> in 1.1.3?
>>
>> Best Regards
>> Varaga
>>
>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Helping out here: this is the PR for async Kafka offset committing -
>>> https://github.com/apache/flink/pull/2574.
>>> It has already been merged into the master and release-1.1 branches, so
>>> you can try out the changes now if you’d like.
>>> The change should also be included in the 1.1.3 release, which the Flink
>>> community is discussing to release soon.
>>>
>>> Will definitely be helpful if you can provide feedback afterwards!
>>>
>>> Best Regards,
>>> Gordon
>>>
>>>
>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>> chakravarth...@gmail.com) wrote:
>>>
>>> Hi Stephan,
>>>
>>>     Is the Async kafka offset commit released in 1.3.1?
>>>
>>> Varaga
>>>
>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>> chakravarth...@gmail.com> wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>>      That should be great. Let me know once the fix is done and the
>>>> snapshot version to use, I'll check and revert then.
>>>>      Can you also share the JIRA that tracks the issue?
>>>>
>>>>      With regards to offset commit issue, I'm not sure as to how to
>>>> proceed here. Probably I'll use your fix first and see if the problem
>>>> reoccurs.
>>>>
>>>> Thanks much
>>>> Varaga
>>>>
>>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> @CVP
>>>>>
>>>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>>>> bytes) and the custom state (e).
>>>>>
>>>>> Here is an illustration of the checkpoint and what is stored (from the
>>>>> Flink docs).
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>>>> nals/stream_checkpointing.html
>>>>>
>>>>>
>>>>> I am quite puzzled why the offset committing problem occurs only for
>>>>> one input, and not for the other.
>>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>>> Could you try out a snapshot version to see if that fixes your problem?
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>> chakravarth...@gmail.com> wrote:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>>      Thanks a million for your detailed explanation. I appreciate it.
>>>>>>
>>>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>>>>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on 
>>>>>> my
>>>>>> localhost (ubuntu 14.04)
>>>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>>>
>>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs started
>>>>>> with no HA. I presume this does not use zookeeper anyways as it runs as
>>>>>> standalone cluster.
>>>>>>
>>>>>>
>>>>>>      BTW., The kafka connector version that I use is as suggested in
>>>>>> the flink connectors page
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *.        <dependency>
>>>>>> <groupId>org.apache.flink</groupId>
>>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>>> <version>1.1.1</version>         </dependency>*
>>>>>>
>>>>>>      Do you see any issues with versions?
>>>>>>
>>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>>
>>>>>>      2) There isn't detailed explanation on what states are stored as
>>>>>> part of the checkpointing process. For ex.,  If I have pipeline like
>>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's
>>>>>> stored is:*
>>>>>>
>>>>>> *         a) The source stream's custom watermarked records*
>>>>>>
>>>>>> *         b) Intermediate states of each of the transformations in
>>>>>> the pipeline*
>>>>>>
>>>>>> *         c) Delta of Records stored from the previous sink*
>>>>>>
>>>>>> *         d) Custom States (SayValueState as in my case) -
>>>>>> Essentially this is what I bother about storing.*
>>>>>> *         e) All of my operators*
>>>>>>
>>>>>>       Is my understanding right?
>>>>>>
>>>>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>>>>
>>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>>> operators (say I wish to store aggregated values part of the 
>>>>>> transformation)
>>>>>>
>>>>>> Best Regards
>>>>>> CVP
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks, the logs were very helpful!
>>>>>>>
>>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>>>>> proper starting of checkpoints.
>>>>>>>
>>>>>>> Here is what is happening in detail:
>>>>>>>
>>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>>> checkpoint" message and when the point when the KafkaSource actually 
>>>>>>> starts
>>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka 
>>>>>>> Inputs
>>>>>>> (the other is fine).
>>>>>>>   - The only way this delayed can be introduced is if another
>>>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>>>>> still in progress when the checkpoint is started. Flink does not perform
>>>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>>>> concurrency model for users.
>>>>>>>   - The operation that is still in progress must be the committing
>>>>>>> of the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>>>>> happens once one side receives the first record. Before that, there is
>>>>>>> nothing to commit.
>>>>>>>
>>>>>>>
>>>>>>> What Flink should fix:
>>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method.
>>>>>>>
>>>>>>> What you can fix:
>>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>>>>>> well, the other does not. Do they go against different sets of brokers, 
>>>>>>> or
>>>>>>> different ZooKeepers? Is the metadata for one input bad?
>>>>>>>   - In the next Flink version, you may opt-out of committing offsets
>>>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>>>> checkpoints anyways.
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>>> chakravarth...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Stefan,
>>>>>>>>
>>>>>>>>     Please find my responses below.
>>>>>>>>
>>>>>>>>     - What source are you using for the slow input?
>>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are
>>>>>>>> Kafka Streams*
>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>
>>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>>>> below.*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> streamEnv.setStateBackend(new
>>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>>>
>>>>>>>>
>>>>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... 
>>>>>>>> basically
>>>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>>>>> Kb... *
>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I 
>>>>>>>> have
>>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>>>
>>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>>>>> see that the checkpoints take more than a minute in each case. Before 
>>>>>>>> these
>>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>>
>>>>>>>>    This log was collected from the standalone flink cluster with 1
>>>>>>>> job manager & 2 TMs. 1 TM was running this application with 
>>>>>>>> checkpointing
>>>>>>>> (parallelism=1)
>>>>>>>>
>>>>>>>>     Please let me know if you need further info.,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi!
>>>>>>>>>
>>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>>> information?
>>>>>>>>>
>>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>>> takes that long, or if it simply takes long for the checkpoint 
>>>>>>>>> barriers to
>>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>>>
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi CVP,
>>>>>>>>>>
>>>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>>
>>>>>>>>>>>     I have a stream application that has 2 stream source as
>>>>>>>>>>> below.
>>>>>>>>>>>
>>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>>
>>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes
>>>>>>>>>>> elements from ks2 into a key-value state member. Elements from ks1 
>>>>>>>>>>> are
>>>>>>>>>>> matched against that state. the CoFlatMapFunction operator maintains
>>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>>
>>>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>>> Precisely when the 1st event is consumed from this stream, 
>>>>>>>>>>> checkpoint takes
>>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>>
>>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>>>
>>>>>>>>>>> I tried to use checkpoint every 10 Secs using a
>>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration is 
>>>>>>>>>>> almost 2
>>>>>>>>>>> minutes for many cases, while for the other cases it varies from 
>>>>>>>>>>> 100 ms to
>>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the dashboard 
>>>>>>>>>>> for
>>>>>>>>>>> your reference.
>>>>>>>>>>>
>>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>>
>>>>>>>>>>>  Best Regards
>>>>>>>>>>> CVP
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to