Hi Stephan,
Is the Async kafka offset commit released in 1.3.1?
Varaga
On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
[email protected]> wrote:
> Hi Stephan,
>
> That should be great. Let me know once the fix is done and the
> snapshot version to use, I'll check and revert then.
> Can you also share the JIRA that tracks the issue?
>
> With regards to offset commit issue, I'm not sure as to how to
> proceed here. Probably I'll use your fix first and see if the problem
> reoccurs.
>
> Thanks much
> Varaga
>
> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <[email protected]> wrote:
>
>> @CVP
>>
>> Flink stores in checkpoints in your case only the Kafka offsets (few
>> bytes) and the custom state (e).
>>
>> Here is an illustration of the checkpoint and what is stored (from the
>> Flink docs).
>> https://ci.apache.org/projects/flink/flink-docs-master/
>> internals/stream_checkpointing.html
>>
>>
>> I am quite puzzled why the offset committing problem occurs only for one
>> input, and not for the other.
>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>> Could you try out a snapshot version to see if that fixes your problem?
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>> [email protected]> wrote:
>>
>>> Hi Stefan,
>>>
>>> Thanks a million for your detailed explanation. I appreciate it.
>>>
>>> - The *zookeeper bundled with kafka 0.9.0.1* was used to start
>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
>>> localhost (ubuntu 14.04)
>>> - There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>
>>> With regards to Flink cluster there's only 1 JM & 2 TMs started
>>> with no HA. I presume this does not use zookeeper anyways as it runs as
>>> standalone cluster.
>>>
>>>
>>> BTW., The kafka connector version that I use is as suggested in the
>>> flink connectors page
>>>
>>>
>>>
>>>
>>> *. <dependency> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>> <version>1.1.1</version> </dependency>*
>>>
>>> Do you see any issues with versions?
>>>
>>> 1) Do you have benchmarks wrt., to checkpointing in flink?
>>>
>>> 2) There isn't detailed explanation on what states are stored as
>>> part of the checkpointing process. For ex., If I have pipeline like
>>> *source -> map -> keyBy -> map -> sink, my assumption on what's stored
>>> is:*
>>>
>>> * a) The source stream's custom watermarked records*
>>>
>>> * b) Intermediate states of each of the transformations in the
>>> pipeline*
>>>
>>> * c) Delta of Records stored from the previous sink*
>>>
>>> * d) Custom States (SayValueState as in my case) - Essentially
>>> this is what I bother about storing.*
>>> * e) All of my operators*
>>>
>>> Is my understanding right?
>>>
>>> 3) Is there a way in Flink to checkpoint only d) as stated above
>>>
>>> 4) Can you apply checkpointing to only streams and certain
>>> operators (say I wish to store aggregated values part of the transformation)
>>>
>>> Best Regards
>>> CVP
>>>
>>>
>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <[email protected]> wrote:
>>>
>>>> Thanks, the logs were very helpful!
>>>>
>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>> proper starting of checkpoints.
>>>>
>>>> Here is what is happening in detail:
>>>>
>>>> - Between the point when the TaskManager receives the "trigger
>>>> checkpoint" message and when the point when the KafkaSource actually starts
>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>> (the other is fine).
>>>> - The only way this delayed can be introduced is if another
>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>> still in progress when the checkpoint is started. Flink does not perform
>>>> concurrent checkpoint operations on a single operator, to ease the
>>>> concurrency model for users.
>>>> - The operation that is still in progress must be the committing of
>>>> the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>> happens once one side receives the first record. Before that, there is
>>>> nothing to commit.
>>>>
>>>>
>>>> What Flink should fix:
>>>> - The KafkaConsumer should run the commit operations asynchronously,
>>>> to not block the "notifyCheckpointComplete()" method.
>>>>
>>>> What you can fix:
>>>> - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>>> well, the other does not. Do they go against different sets of brokers, or
>>>> different ZooKeepers? Is the metadata for one input bad?
>>>> - In the next Flink version, you may opt-out of committing offsets to
>>>> Kafka/ZooKeeper all together. It is not important for Flink's checkpoints
>>>> anyways.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> Please find my responses below.
>>>>>
>>>>> - What source are you using for the slow input?
>>>>> * [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>> Streams*
>>>>> - How large is the state that you are checkpointing?
>>>>>
>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>> below.*
>>>>>
>>>>>
>>>>>
>>>>> * final StreamExecutionEnvironment streamEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> streamEnv.setStateBackend(new
>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>> streamEnv.enableCheckpointing(10000);*
>>>>>
>>>>>
>>>>> * In terms of the state stored, the KS1 stream has payload of
>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>> Kb...*
>>>>> - Can you try to see in the log if actually the state snapshot takes
>>>>> that long, or if it simply takes long for the checkpoint barriers to
>>>>> travel through the stream due to a lot of backpressure?
>>>>> [CVP] -There are no back pressure atleast from the sample
>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>
>>>>> I have attached the checkpoints times' as .png from the
>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>> see that the checkpoints take more than a minute in each case. Before
>>>>> these
>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>
>>>>> This log was collected from the standalone flink cluster with 1 job
>>>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>> (parallelism=1)
>>>>>
>>>>> Please let me know if you need further info.,
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>> information?
>>>>>>
>>>>>> - What source are you using for the slow input?
>>>>>> - How large is the state that you are checkpointing?
>>>>>> - Can you try to see in the log if actually the state snapshot
>>>>>> takes that long, or if it simply takes long for the checkpoint barriers
>>>>>> to
>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi CVP,
>>>>>>>
>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>> [email protected]>:
>>>>>>>
>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>
>>>>>>>> I have a stream application that has 2 stream source as below.
>>>>>>>>
>>>>>>>> KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>> KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>
>>>>>>>> ks1.connect(ks2).flatMap(X);
>>>>>>>> //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>
>>>>>>>> //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>> //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2
>>>>>>>> minutes
>>>>>>>> straight away.
>>>>>>>>
>>>>>>>> The version of flink is 1.1.2.
>>>>>>>>
>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for
>>>>>>>> many
>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your
>>>>>>>> reference.
>>>>>>>>
>>>>>>>> Is this an issue with flink checkpointing?
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>