I am not sure about that, I will run the pipeline on cluster and share the
details
Since window is a stateful operator , it will store only the key part in
the state backend and not the value , right ?

Regards,
Vinay Patil

On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n9181...@n4.nabble.com> wrote:

> @vinay - Is it in your case large state that causes slower checkpoints?
>
> On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=9181&i=0>> wrote:
>
>> Hi,
>>
>> I am also facing this issue, in my case the data is flowing continuously
>> from the Kafka source, when I increase the checkpoint interval to 60000,
>> the data gets written to S3 sink.
>>
>> Is it because some operator is taking more time for processing, like in
>> my case I am using a time window of 1sec.
>>
>> Regards,
>> Vinay Patil
>>
>> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache Flink
>> User Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote:
>>
>>> Hi Stefan,
>>>
>>>     Please find my responses below.
>>>
>>>     - What source are you using for the slow input?
>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>> Streams*
>>>   - How large is the state that you are checkpointing?
>>>
>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as below.*
>>>
>>>
>>>
>>> *         final StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> streamEnv.setStateBackend(new
>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>> streamEnv.enableCheckpointing(10000);*
>>>
>>>
>>> *      In terms of the state stored, the KS1 stream has payload of 100K
>>> events/second, while KS2 have about 1 event / 10 minutes... basically the
>>> operators perform flatmaps on 8 fields of tuple (all fields are
>>> primitives). If you look at the states' sizes in dashboard they are in
>>> Kb...*
>>>   - Can you try to see in the log if actually the state snapshot takes
>>> that long, or if it simply takes long for the checkpoint barriers to
>>> travel through the stream due to a lot of backpressure?
>>>     [CVP] -There are no back pressure atleast from the sample
>>> computation in the flink dashboard. 100K/second is low load for flink's
>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>
>>>      I have attached the checkpoints times' as .png from the dashboard.
>>> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the
>>> checkpoints take more than a minute in each case. Before these checkpoints,
>>> the KS2 stream did not have any events. As soon as an event(should be in
>>> bytes) was generated, the checkpoints went slow and subsequently a minute
>>> more for every checkpoint thereafter.
>>>
>>>    This log was collected from the standalone flink cluster with 1 job
>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>> (parallelism=1)
>>>
>>>     Please let me know if you need further info.,
>>>
>>>
>>>
>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote:
>>>
>>>> Hi!
>>>>
>>>> Let's try to figure that one out. Can you give us a bit more
>>>> information?
>>>>
>>>>   - What source are you using for the slow input?
>>>>   - How large is the state that you are checkpointing?
>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>> that long, or if it simply takes long for the checkpoint barriers to travel
>>>> through the stream due to a lot of backpressure?
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote:
>>>>
>>>>> Hi CVP,
>>>>>
>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>:
>>>>>
>>>>>> Hi Aljoscha & Fabian,
>>>>>>
>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>
>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>
>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>
>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 
>>>>>> minutes
>>>>>> straight away.
>>>>>>
>>>>>>     The version of flink is 1.1.2.
>>>>>>
>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for 
>>>>>> many
>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>> frequently. I'm attaching the snapshot of the dashboard for your 
>>>>>> reference.
>>>>>>
>>>>>>      Is this an issue with flink checkpointing?
>>>>>>
>>>>>>  Best Regards
>>>>>> CVP
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>> *flink_job_Plan.png* (42K) Download Attachment
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png>
>>> *Flink-Checkpoint-Times.png* (65K) Download Attachment
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png>
>>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download Attachment
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log>
>>>
>>>
>>> ------------------------------
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9176.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=1>
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Flink Checkpoint runs slow for low
>> load stream
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-
> Checkpoint-runs-slow-for-low-load-stream-tp9147p9181.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9182.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to