Hi Stefan,

Sorry for providing partial information. The attachment is the full logs
for checkpoint #1577.

Why I would say it seems that asynchronous part was not executed
immediately is due to all synchronous parts were all finished at 2017-09-27
13:49.
Did that mean the checkpoint barrier event had already arrived at the
operator and started as soon as when the JM triggered the checkpoint?

Best Regards,
Tony Wei

2017-09-28 18:22 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:

> Hi,
>
> I agree that the memory consumption looks good. If there is only one TM,
> it will run inside one JVM. As for the 7 minutes, you mean the reported
> end-to-end time? This time measurement starts when the checkpoint is
> triggered on the job manager, the first contributor is then the time that
> it takes for the checkpoint barrier event to travel with the stream to the
> operators. If there is back pressure and a lot of events are buffered, this
> can introduce delay to this first part, because barriers must not overtake
> data for correctness. After the barrier arrives at the operator, next comes
> the synchronous part of the checkpoint, which is typically short running
> and takes a snapshot of the state (think of creating an immutable version,
> e.g. through copy on write). In the asynchronous part, this snapshot is
> persisted to DFS. After that the timing stops and is reported together with
> the acknowledgement to the job manager.
>
> So, I would assume if reporting took 7 minutes end-to-end, and the async
> part took 4 minutes, it is likely that it took around 3 minutes for the
> barrier event to travel with the stream. About the debugging, I think it is
> hard to figure out what is going on with the DFS if you don’t have metrics
> on that. Maybe you could attach a sampler to the TM’s jvm and monitor where
> time is spend for the snapshotting?
>
> I am also looping in Stephan, he might have more suggestions.
>
> Best,
> Stefan
>
> Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920...@gmail.com>:
>
> Hi Stefan,
>
> These are some telemetry information, but I don't have history information
> about gc.
>
> <???? 2017-09-2 8 下午4.51.26.png>
> <???? 2017-09-2 8 下午4.51.11.png>
>
> 1) Yes, my state is not large.
> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
> Since this is a POC, we might move to AWS in the future or use HDFS in the
> same cluster. However, how can I recognize the problem is this.
> 3) It seems memory usage is bounded. I'm not sure if the status showed
> above is fine.
>
> There is only one TM in my cluster for now, so all tasks are running on
> that machine. I think that means they are in the same JVM, right?
> Besides taking so long on asynchronous part, there is another question is
> that the late message showed that this task was delay for almost 7 minutes,
> but the log showed it only took 4 minutes.
> It seems that it was somehow waiting for being executed. Are there some
> points to find out what happened?
>
> For the log information, what I means is it is hard to recognize which
> checkpoint id that asynchronous parts belong to if the checkpoint takes
> more time and there are more concurrent checkpoints taking place.
> Also, it seems that asynchronous part might be executed right away if
> there is no resource from thread pool. It is better to measure the time
> between creation time and processing time, and log it and checkpoint id
> with the original log that showed what time the asynchronous part took.
>
> Best Regards,
> Tony Wei
>
> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>
>> Hi,
>>
>> when the async part takes that long I would have 3 things to look at:
>>
>> 1) Is your state so large? I don’t think this applies in your case, right?
>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>> 3) Are we running low on memory on that task manager?
>>
>> Do you have telemetry information about used heap and gc pressure on the
>> problematic task? However, what speaks against the memory problem
>> hypothesis is that future checkpoints seem to go through again. What I find
>> very strange is that within the reported 4 minutes of the async part the
>> only thing that happens is: open dfs output stream, iterate the in-memory
>> state and write serialized state data to dfs stream, then close the stream.
>> No locks or waits in that section, so I would assume that for one of the
>> three reasons I gave, writing the state is terribly slow.
>>
>> Those snapshots should be able to run concurrently, for example so that
>> users can also take savepoints  even when a checkpoint was triggered and is
>> still running, so there is no way to guarantee that the previous parts have
>> finished, this is expected behaviour. Which waiting times are you missing
>> in the log? I think the information about when a checkpoint is triggered,
>> received by the TM, performing the sync and async part and acknowledgement
>> time should all be there?.
>>
>> Best,
>> Stefan
>>
>>
>>
>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920...@gmail.com>:
>>
>> Hi Stefan,
>>
>> The checkpoint on my job has been subsumed again. There are some
>> questions that I don't understand.
>>
>> Log in JM :
>> 2017-09-27 13:45:15,686 INFO 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>> 2017-09-27 13:49:42,795 INFO 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1577 @ 1506520182795
>> 2017-09-27 13:54:42,795 INFO 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1578 @ 1506520482795
>> 2017-09-27 13:55:13,105 INFO 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>> 2017-09-27 13:56:37,103 WARN 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Received late message for now expired checkpoint attempt 1577 from
>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>> 2017-09-27 13:59:42,795 INFO 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1579 @ 1506520782795
>>
>> Log in TM:
>> 2017-09-27 13:56:37,105 INFO 
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend
>> - DefaultOperatorStateBackend snapshot (File Stream Factory @
>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task
>> Threads] took 240248 ms.
>>
>> I think the log in TM might be the late message for #1577 in JM, because
>> #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
>> If there is no mistake on my words, I am wondering why the time it took
>> was 240248 ms (4 min). It seems that it started late than asynchronous
>> tasks in #1578.
>> Is there any way to guarantee the previous asynchronous parts of
>> checkpoints will be executed before the following.
>>
>> Moreover, I think it will be better to have more information in INFO log,
>> such as waiting time and checkpoint id, in order to trace the progress of
>> checkpoint conveniently.
>>
>> What do you think? Do you have any suggestion for me to deal with these
>> problems? Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920...@gmail.com>:
>>
>>> Hi Stefan,
>>>
>>> Here is the summary for my streaming job's checkpoint after restarting
>>> at last night.
>>>
>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>
>>> This is the distribution of alignment buffered from the last 12 hours.
>>>
>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>
>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk
>>> #1245 and #1246, you can check the picture I sent before.
>>>
>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>
>>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes
>>> up to HIGH, and always OK during the night.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>>
>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>>>
>>>> Hi Tony,
>>>>
>>>> are your checkpoints typically close to the timeout boundary? From what
>>>> I see, writing the checkpoint is relatively fast but the time from the
>>>> checkpoint trigger to execution seems very long. This is typically the case
>>>> if your job has a lot of backpressure and therefore the checkpoint barriers
>>>> take a long time to travel to the operators, because a lot of events are
>>>> piling up in the buffers. Do you also experience large alignments for your
>>>> checkpoints?
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920...@gmail.com>:
>>>>
>>>> Hi Stefan,
>>>>
>>>> It seems that I found something strange from JM's log.
>>>>
>>>> It had happened more than once before, but all subtasks would finish
>>>> their checkpoint attempts in the end.
>>>>
>>>> 2017-09-26 01:23:28,690 INFO 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Triggering checkpoint 1140 @ 1506389008690
>>>> 2017-09-26 01:28:28,690 INFO 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Triggering checkpoint 1141 @ 1506389308690
>>>> 2017-09-26 01:33:28,690 INFO 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Triggering checkpoint 1142 @ 1506389608690
>>>> 2017-09-26 01:33:28,691 INFO 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Checkpoint 1140 expired before completing.
>>>> 2017-09-26 01:38:28,691 INFO 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Checkpoint 1141 expired before completing.
>>>> 2017-09-26 01:40:38,044 WARN 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Received late message for now expired checkpoint attempt 1140 from
>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>> 2017-09-26 01:40:53,743 WARN 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Received late message for now expired checkpoint attempt 1141 from
>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>> 2017-09-26 01:41:19,332 INFO 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>
>>>> For chk #1245 and #1246, there was no late message from TM. You can
>>>> refer to the TM log. The full completed checkpoint attempt will have 12
>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got 10
>>>> logs.
>>>>
>>>> 2017-09-26 10:08:28,690 INFO 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Triggering checkpoint 1245 @ 1506420508690
>>>> 2017-09-26 10:13:28,690 INFO 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Triggering checkpoint 1246 @ 1506420808690
>>>> 2017-09-26 10:18:28,691 INFO 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Checkpoint 1245 expired before completing.
>>>> 2017-09-26 10:23:28,691 INFO 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Checkpoint 1246 expired before completing.
>>>>
>>>> Moreover, I listed the directory for checkpoints on S3 and saw there
>>>> were two states not discarded successfully. In general, there will be 16
>>>> parts for a completed checkpoint state.
>>>>
>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c0
>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf
>>>> 0b-11cc1fc67ab8
>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c0
>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-85
>>>> 09-5fea4ed25af6
>>>>
>>>> Hope these informations are helpful. Thank you.
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>
>>>> :
>>>>
>>>>> Hi,
>>>>>
>>>>> thanks for the information. Unfortunately, I have no immediate idea
>>>>> what the reason is from the given information. I think most helpful could
>>>>> be a thread dump, but also metrics on the operator operator level to 
>>>>> figure
>>>>> out which part of the pipeline is the culprit.
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920...@gmail.com>:
>>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> There is no unknown exception in my full log. The Flink version is
>>>>> 1.3.2.
>>>>> My job is roughly like this.
>>>>>
>>>>> env.addSource(Kafka)
>>>>>   .map(ParseKeyFromRecord)
>>>>>   .keyBy()
>>>>>   .process(CountAndTimeoutWindow)
>>>>>   .asyncIO(UploadToS3)
>>>>>   .addSink(UpdateDatabase)
>>>>>
>>>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>>>
>>>>> I will keep my eye on taking a thread dump from that JVM if this
>>>>> happens again.
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> that is very strange indeed. I had a look at the logs and there is no
>>>>>> error or exception reported. I assume there is also no exception in your
>>>>>> full logs? Which version of flink are you using and what operators were
>>>>>> running in the task that stopped? If this happens again, would it be
>>>>>> possible to take a thread dump from that JVM?
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920...@gmail.com>:
>>>>>> >
>>>>>> > Hi,
>>>>>> >
>>>>>> > Something weird happened on my streaming job.
>>>>>> >
>>>>>> > I found my streaming job seems to be blocked for a long time and I
>>>>>> saw the situation like the picture below. (chk #1245 and #1246 were all
>>>>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>>>>>> with the same state like #1247 util I restarted TM.)
>>>>>> >
>>>>>> > <snapshot.png>
>>>>>> >
>>>>>> > I'm not sure what happened, but the consumer stopped fetching
>>>>>> records, buffer usage is 100% and the following task did not seem to 
>>>>>> fetch
>>>>>> data anymore. Just like the whole TM was stopped.
>>>>>> >
>>>>>> > However, after I restarted TM and force the job restarting from the
>>>>>> latest completed checkpoint, everything worked again. And I don't know 
>>>>>> how
>>>>>> to reproduce it.
>>>>>> >
>>>>>> > The attachment is my TM log. Because there are many user logs and
>>>>>> sensitive information, I only remain the log from `org.apache.flink...`.
>>>>>> >
>>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>>> >
>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max
>>>>>> concurrent number is 3.
>>>>>> >
>>>>>> > Please let me know if it needs more information to find out what
>>>>>> happened on my streaming job. Thanks for your help.
>>>>>> >
>>>>>> > Best Regards,
>>>>>> > Tony Wei
>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>
>

Reply via email to