Sure, I opened Jira FLINK-7757 and this PR: 
https://github.com/apache/flink/pull/4764 
<https://github.com/apache/flink/pull/4764> .

Best,
Stefan

> Am 03.10.2017 um 10:25 schrieb Tony Wei <tony19920...@gmail.com>:
> 
> Hi Stefan,
> 
> Thank you very much. I will try to investigate what's the problem on my 
> cluster and S3.
> BTW, Is there any Jira issue associated with your improvement, so that I can 
> track it?
> 
> Best Regards,
> Tony Wei
> 
> 2017-10-03 16:01 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>>:
> Hi,
> 
> from the stack trace, it seems to me like closing the checkpoint output 
> stream to S3 is the culprit:
> 
> "pool-55-thread-7" #458829 prio=5 os_prio=0 tid=0x00007fda180c4000 nid=0x55a2 
> waiting on condition [0x00007fda092d7000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000007154050b8> (a 
> java.util.concurrent.FutureTask)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> com.amazonaws.services.s3.transfer.internal.UploadImpl.waitForUploadResult(UploadImpl.java:66)
>       at 
> org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:131)
>       - locked <0x00000007154801d0> (a 
> org.apache.hadoop.fs.s3a.S3AOutputStream)
>       at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>       at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:48)
>       at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>       at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
>       - locked <0x0000000715480238> (a 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420)
>       - locked <0x000000073ef55b00> (a 
> org.apache.flink.runtime.util.SerializableObject)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
>       at org.apache.flink.runtime.io 
> <http://org.apache.flink.runtime.io/>.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> 
> In particular, this holds lock 0x000000073ef55b00, which blocks the next 
> checkpoint in it’s synchronous phase:
> 
> "count-with-timeout-window -> s3-uploader -> Sink: meta-store-committer 
> (7/12)" #454093 daemon prio=5 os_prio=0 tid=0x00007fda28040000 nid=0x2f3b 
> waiting for monitor entry [0x00007fda0a5e8000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshotFully(RocksDBKeyedStateBackend.java:379)
>       - waiting to lock <0x000000073ef55b00> (a 
> org.apache.flink.runtime.util.SerializableObject)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:317)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
>       - locked <0x000000073ee55068> (a java.lang.Object)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
>       at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
>       at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
>       at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
>       at org.apache.flink.streaming.runtime.io 
> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:748)
> 
> This, in turn, blocks the operators main processing loop (I marked it further 
> down the trace) and processing stops.
> 
> So while I assume that something bad happens with your S3, it is also not 
> nice that this brings down the pipeline in such ways.
> 
> I had actually already created a branch that aims to improve (=reduce) the 
> whole locking in the RockDBKeyedStateBackend to prevent exactly such 
> scenarios: 
> Right now, the whole purpose of this lock is protecting the RocksDB instance 
> from getting disposed while concurrent operations, such as checkpoints, are 
> still running. Protecting the RocksDB instance is important because it is a 
> native library and accessing a disposed instance will cause segfaults. 
> However, it is actually not required to hold on to the lock all the time. The 
> idea behind my change is to have a synchronized client counter to track all 
> ongoing workers that use the RocksDB instance, so only incrementing, 
> decrementing, and checking the counter happens under the lock. Disposing the 
> RocksDB instance can then only start when the „client count“ is zero, and 
> after it started, no new clients can register. So it is similar to 
> reader/writer locking, where all ops on the DB are „reader" and disposing the 
> instance is the „writer".
> 
> I am currently on holidays, maybe this small change is quiet useful and I 
> will prioritize it a bit when I am back. Nevertheless, I suggest to 
> investigate why S3 is behaving like this.
> 
> Best,
> Stefan
> 
> 
> 
>> Am 03.10.2017 um 07:26 schrieb Tony Wei <tony19920...@gmail.com 
>> <mailto:tony19920...@gmail.com>>:
>> 
>> Hi Stefan,
>> 
>> It seems that the similar situation, in which job blocked after checkpoint 
>> timeout, came across to my job. BTW, this is another job that I raised 
>> parallelism and throughput of input.
>> 
>> After chk #8 started, the whole operator seems blocked.
>> 
>> I recorded some JM / TM logs, snapshots and thread dump logs, which the 
>> attachment is. Hope these will help to find the root cause. Thank you.
>> 
>> Best Regards,
>> Tony Wie
>> 
>> ==========================================================================================================================================================
>> 
>> JM log:
>> 
>> 2017-10-03 03:46:49,371 WARN  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received 
>> late message for now expired checkpoint attempt 7 from 
>> b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
>> 2017-10-03 03:47:00,977 WARN  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received 
>> late message for now expired checkpoint attempt 8 from 
>> b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
>> 
>> TM log:
>> 
>> 2017-10-03 03:46:46,962 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Asynchronous RocksDB snapshot (File Stream Factory @ 
>> s3://tony-dev/flink-checkpoints/ecfa5968e831e547ed70d1359a615f72 <>, 
>> asynchronous part) in thread Thread[pool-55-thread-7,5,Flink Task Threads] 
>> took 1211517 ms.
>> 
>> Snapshots:
>> 
>> <???? 2017-10-03  下午12.20.11.png>
>> <???? 2017-10-03  下午12.22.10.png>
>> 
>> 2017-09-28 20:29 GMT+08:00 Tony Wei <tony19920...@gmail.com 
>> <mailto:tony19920...@gmail.com>>:
>> Hi Stefan,
>> 
>> That reason makes sense to me. Thanks for point me out. 
>> 
>> About my job, the database currently was never used, I disabled it for some 
>> reasons, but output to s3 was implemented by async io. 
>> 
>> I used ForkJoinPool with 50 capacity. 
>> I have tried to rebalance after count window to monitor the back pressure on 
>> upload operator. 
>> The result is always OK status. 
>> I think the reason is due to that count window buffered lots of records, so 
>> the input rate in upload operator was not too high. 
>> 
>> But I am not sure that if the setup for my capacity of ForkJoinPool would 
>> impact the process asynchronous checkpoints both machine's resources and s3 
>> connection. 
>> 
>> BTW, s3 serves both operator and checkpointing and I used aws java api to 
>> access s3 in upload operator in order to control where the files go. 
>> 
>> Best Regards,
>> Tony Wei
>> 
>> Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>>於 2017年9月28日 週四,下午7:43寫道:
>> Hi,
>> 
>> the gap between the sync and the async part does not mean too much. What 
>> happens per task is that all operators go through their sync part, and then 
>> one thread executes all the async parts, one after the other. So if an async 
>> part starts late, this is just because it started only after another async 
>> part finished.
>> 
>> I have one more question about your job,because it involves communication 
>> with external systems, like S3 and a database. Are you sure that they cannot 
>> sometimes become a bottleneck, block, and bring down your job. in 
>> particular: is the same S3 used to serve the operator and checkpointing and 
>> what is your sustained read/write rate there and the maximum number of 
>> connections? You can try to use the backpressure metric and try to identify 
>> the first operator (counting from the sink) that indicates backpressure.
>> 
>> Best,
>> Stefan
>> 
>> 
>>> Am 28.09.2017 um 12:59 schrieb Tony Wei <tony19920...@gmail.com 
>>> <mailto:tony19920...@gmail.com>>:
>>> 
>> 
>>> Hi,
>>> 
>>> Sorry. This is the correct one.
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2017-09-28 18:55 GMT+08:00 Tony Wei <tony19920...@gmail.com 
>>> <mailto:tony19920...@gmail.com>>:
>>> Hi Stefan, 
>>> 
>>> Sorry for providing partial information. The attachment is the full logs 
>>> for checkpoint #1577.
>>> 
>>> Why I would say it seems that asynchronous part was not executed 
>>> immediately is due to all synchronous parts were all finished at 2017-09-27 
>>> 13:49.
>>> Did that mean the checkpoint barrier event had already arrived at the 
>>> operator and started as soon as when the JM triggered the checkpoint?
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com 
>>> <mailto:s.rich...@data-artisans.com>>:
>>> Hi,
>>> 
>>> I agree that the memory consumption looks good. If there is only one TM, it 
>>> will run inside one JVM. As for the 7 minutes, you mean the reported 
>>> end-to-end time? This time measurement starts when the checkpoint is 
>>> triggered on the job manager, the first contributor is then the time that 
>>> it takes for the checkpoint barrier event to travel with the stream to the 
>>> operators. If there is back pressure and a lot of events are buffered, this 
>>> can introduce delay to this first part, because barriers must not overtake 
>>> data for correctness. After the barrier arrives at the operator, next comes 
>>> the synchronous part of the checkpoint, which is typically short running 
>>> and takes a snapshot of the state (think of creating an immutable version, 
>>> e.g. through copy on write). In the asynchronous part, this snapshot is 
>>> persisted to DFS. After that the timing stops and is reported together with 
>>> the acknowledgement to the job manager. 
>>> 
>>> So, I would assume if reporting took 7 minutes end-to-end, and the async 
>>> part took 4 minutes, it is likely that it took around 3 minutes for the 
>>> barrier event to travel with the stream. About the debugging, I think it is 
>>> hard to figure out what is going on with the DFS if you don’t have metrics 
>>> on that. Maybe you could attach a sampler to the TM’s jvm and monitor where 
>>> time is spend for the snapshotting?
>>> 
>>> I am also looping in Stephan, he might have more suggestions.
>>> 
>>> Best,
>>> Stefan
>>> 
>>>> Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920...@gmail.com 
>>>> <mailto:tony19920...@gmail.com>>:
>>>> 
>>>> Hi Stefan,
>>>> 
>>>> These are some telemetry information, but I don't have history information 
>>>> about gc.
>>>> 
>>>> <???? 2017-09-2 8 下午4.51.26.png>
>>>> <???? 2017-09-2 8 下午4.51.11.png>
>>>> 
>>>> 1) Yes, my state is not large.
>>>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. 
>>>> Since this is a POC, we might move to AWS in the future or use HDFS in the 
>>>> same cluster. However, how can I recognize the problem is this.
>>>> 3) It seems memory usage is bounded. I'm not sure if the status showed 
>>>> above is fine.
>>>> 
>>>> There is only one TM in my cluster for now, so all tasks are running on 
>>>> that machine. I think that means they are in the same JVM, right?
>>>> Besides taking so long on asynchronous part, there is another question is 
>>>> that the late message showed that this task was delay for almost 7 
>>>> minutes, but the log showed it only took 4 minutes.
>>>> It seems that it was somehow waiting for being executed. Are there some 
>>>> points to find out what happened?
>>>> 
>>>> For the log information, what I means is it is hard to recognize which 
>>>> checkpoint id that asynchronous parts belong to if the checkpoint takes 
>>>> more time and there are more concurrent checkpoints taking place.
>>>> Also, it seems that asynchronous part might be executed right away if 
>>>> there is no resource from thread pool. It is better to measure the time 
>>>> between creation time and processing time, and log it and checkpoint id 
>>>> with the original log that showed what time the asynchronous part took.
>>>> 
>>>> Best Regards,
>>>> Tony Wei
>>>> 
>>>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com 
>>>> <mailto:s.rich...@data-artisans.com>>:
>>>> Hi,
>>>> 
>>>> when the async part takes that long I would have 3 things to look at:
>>>> 
>>>> 1) Is your state so large? I don’t think this applies in your case, right?
>>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>>> 3) Are we running low on memory on that task manager?
>>>> 
>>>> Do you have telemetry information about used heap and gc pressure on the 
>>>> problematic task? However, what speaks against the memory problem 
>>>> hypothesis is that future checkpoints seem to go through again. What I 
>>>> find very strange is that within the reported 4 minutes of the async part 
>>>> the only thing that happens is: open dfs output stream, iterate the 
>>>> in-memory state and write serialized state data to dfs stream, then close 
>>>> the stream. No locks or waits in that section, so I would assume that for 
>>>> one of the three reasons I gave, writing the state is terribly slow.
>>>> 
>>>> Those snapshots should be able to run concurrently, for example so that 
>>>> users can also take savepoints  even when a checkpoint was triggered and 
>>>> is still running, so there is no way to guarantee that the previous parts 
>>>> have finished, this is expected behaviour. Which waiting times are you 
>>>> missing in the log? I think the information about when a checkpoint is 
>>>> triggered, received by the TM, performing the sync and async part and 
>>>> acknowledgement time should all be there?.
>>>> 
>>>> Best,
>>>> Stefan
>>>> 
>>>> 
>>>> 
>>>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920...@gmail.com 
>>>>> <mailto:tony19920...@gmail.com>>:
>>>>> 
>>>>> Hi Stefan,
>>>>> 
>>>>> The checkpoint on my job has been subsumed again. There are some 
>>>>> questions that I don't understand.
>>>>> 
>>>>> Log in JM :
>>>>> 2017-09-27 13:45:15,686 INFO 
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
>>>>> checkpoint 1576 (174693180 bytes in 21597 ms).
>>>>> 2017-09-27 13:49:42,795 INFO 
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>>>>> checkpoint 1577 @ 1506520182795
>>>>> 2017-09-27 13:54:42,795 INFO 
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>>>>> checkpoint 1578 @ 1506520482795
>>>>> 2017-09-27 13:55:13,105 INFO 
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
>>>>> checkpoint 1578 (152621410 bytes in 19109 ms).
>>>>> 2017-09-27 13:56:37,103 WARN 
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
>>>>> message for now expired checkpoint attempt 1577 from 
>>>>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>>>> 2017-09-27 13:59:42,795 INFO 
>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>>>>> checkpoint 1579 @ 1506520782795
>>>>> 
>>>>> Log in TM:
>>>>> 2017-09-27 13:56:37,105 INFO 
>>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend - 
>>>>> DefaultOperatorStateBackend snapshot (File Stream Factory @ 
>>>>> s3://tony-dev/flink- <>checkpoints/7c039572b13346f1b17dcc0ace2b72c2, 
>>>>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task 
>>>>> Threads] took 240248 ms.
>>>>> 
>>>>> I think the log in TM might be the late message for #1577 in JM, because 
>>>>> #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
>>>>> If there is no mistake on my words, I am wondering why the time it took 
>>>>> was 240248 ms (4 min). It seems that it started late than asynchronous 
>>>>> tasks in #1578.
>>>>> Is there any way to guarantee the previous asynchronous parts of 
>>>>> checkpoints will be executed before the following.
>>>>> 
>>>>> Moreover, I think it will be better to have more information in INFO log, 
>>>>> such as waiting time and checkpoint id, in order to trace the progress of 
>>>>> checkpoint conveniently.
>>>>> 
>>>>> What do you think? Do you have any suggestion for me to deal with these 
>>>>> problems? Thank you.
>>>>> 
>>>>> Best Regards,
>>>>> Tony Wei
>>>>> 
>>>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920...@gmail.com 
>>>>> <mailto:tony19920...@gmail.com>>:
>>>>> Hi Stefan,
>>>>> 
>>>>> Here is the summary for my streaming job's checkpoint after restarting at 
>>>>> last night.
>>>>> 
>>>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>>> 
>>>>> This is the distribution of alignment buffered from the last 12 hours.
>>>>> 
>>>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>>> 
>>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk 
>>>>> #1245 and #1246, you can check the picture I sent before.
>>>>> 
>>>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>>> 
>>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes up 
>>>>> to HIGH, and always OK during the night.
>>>>> 
>>>>> Best Regards,
>>>>> Tony Wei
>>>>> 
>>>>> 
>>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com 
>>>>> <mailto:s.rich...@data-artisans.com>>:
>>>>> Hi Tony,
>>>>> 
>>>>> are your checkpoints typically close to the timeout boundary? From what I 
>>>>> see, writing the checkpoint is relatively fast but the time from the 
>>>>> checkpoint trigger to execution seems very long. This is typically the 
>>>>> case if your job has a lot of backpressure and therefore the checkpoint 
>>>>> barriers take a long time to travel to the operators, because a lot of 
>>>>> events are piling up in the buffers. Do you also experience large 
>>>>> alignments for your checkpoints?
>>>>> 
>>>>> Best,
>>>>> Stefan  
>>>>> 
>>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920...@gmail.com 
>>>>>> <mailto:tony19920...@gmail.com>>:
>>>>>> 
>>>>>> Hi Stefan,
>>>>>> 
>>>>>> It seems that I found something strange from JM's log.
>>>>>> 
>>>>>> It had happened more than once before, but all subtasks would finish 
>>>>>> their checkpoint attempts in the end.
>>>>>> 
>>>>>> 2017-09-26 01:23:28,690 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>>>>>> checkpoint 1140 @ 1506389008690
>>>>>> 2017-09-26 01:28:28,690 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>>>>>> checkpoint 1141 @ 1506389308690
>>>>>> 2017-09-26 01:33:28,690 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>>>>>> checkpoint 1142 @ 1506389608690
>>>>>> 2017-09-26 01:33:28,691 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
>>>>>> 1140 expired before completing.
>>>>>> 2017-09-26 01:38:28,691 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
>>>>>> 1141 expired before completing.
>>>>>> 2017-09-26 01:40:38,044 WARN 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received 
>>>>>> late message for now expired checkpoint attempt 1140 from 
>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>> 2017-09-26 01:40:53,743 WARN 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received 
>>>>>> late message for now expired checkpoint attempt 1141 from 
>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>> 2017-09-26 01:41:19,332 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
>>>>>> checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>>> 
>>>>>> For chk #1245 and #1246, there was no late message from TM. You can 
>>>>>> refer to the TM log. The full completed checkpoint attempt will have 12 
>>>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got 10 
>>>>>> logs.
>>>>>> 
>>>>>> 2017-09-26 10:08:28,690 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>>>>>> checkpoint 1245 @ 1506420508690
>>>>>> 2017-09-26 10:13:28,690 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>>>>>> checkpoint 1246 @ 1506420808690
>>>>>> 2017-09-26 10:18:28,691 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
>>>>>> 1245 expired before completing.
>>>>>> 2017-09-26 10:23:28,691 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
>>>>>> 1246 expired before completing.
>>>>>> 
>>>>>> Moreover, I listed the directory for checkpoints on S3 and saw there 
>>>>>> were two states not discarded successfully. In general, there will be 16 
>>>>>> parts for a completed checkpoint state.
>>>>>> 
>>>>>> 2017-09-26 18:08:33 36919 
>>>>>> tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
>>>>>> 2017-09-26 18:13:34 37419 
>>>>>> tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6
>>>>>> 
>>>>>> Hope these informations are helpful. Thank you.
>>>>>> 
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>> 
>>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com 
>>>>>> <mailto:s.rich...@data-artisans.com>>:
>>>>>> Hi,
>>>>>> 
>>>>>> thanks for the information. Unfortunately, I have no immediate idea what 
>>>>>> the reason is from the given information. I think most helpful could be 
>>>>>> a thread dump, but also metrics on the operator operator level to figure 
>>>>>> out which part of the pipeline is the culprit.
>>>>>> 
>>>>>> Best,
>>>>>> Stefan
>>>>>> 
>>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920...@gmail.com 
>>>>>>> <mailto:tony19920...@gmail.com>>:
>>>>>>> 
>>>>>>> Hi Stefan,
>>>>>>> 
>>>>>>> There is no unknown exception in my full log. The Flink version is 
>>>>>>> 1.3.2.
>>>>>>> My job is roughly like this.
>>>>>>> 
>>>>>>> env.addSource(Kafka)
>>>>>>>   .map(ParseKeyFromRecord)
>>>>>>>   .keyBy()
>>>>>>>   .process(CountAndTimeoutWindow)
>>>>>>>   .asyncIO(UploadToS3)
>>>>>>>   .addSink(UpdateDatabase)
>>>>>>> 
>>>>>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>>>>> 
>>>>>>> I will keep my eye on taking a thread dump from that JVM if this 
>>>>>>> happens again.
>>>>>>> 
>>>>>>> Best Regards,
>>>>>>> Tony Wei
>>>>>>> 
>>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com 
>>>>>>> <mailto:s.rich...@data-artisans.com>>:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> that is very strange indeed. I had a look at the logs and there is no 
>>>>>>> error or exception reported. I assume there is also no exception in 
>>>>>>> your full logs? Which version of flink are you using and what operators 
>>>>>>> were running in the task that stopped? If this happens again, would it 
>>>>>>> be possible to take a thread dump from that JVM?
>>>>>>> 
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>> 
>>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920...@gmail.com 
>>>>>>> > <mailto:tony19920...@gmail.com>>:
>>>>>>> >
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > Something weird happened on my streaming job.
>>>>>>> >
>>>>>>> > I found my streaming job seems to be blocked for a long time and I 
>>>>>>> > saw the situation like the picture below. (chk #1245 and #1246 were 
>>>>>>> > all finishing 7/8 tasks then marked timeout by JM. Other checkpoints 
>>>>>>> > failed with the same state like #1247 util I restarted TM.)
>>>>>>> >
>>>>>>> > <snapshot.png>
>>>>>>> >
>>>>>>> > I'm not sure what happened, but the consumer stopped fetching 
>>>>>>> > records, buffer usage is 100% and the following task did not seem to 
>>>>>>> > fetch data anymore. Just like the whole TM was stopped.
>>>>>>> >
>>>>>>> > However, after I restarted TM and force the job restarting from the 
>>>>>>> > latest completed checkpoint, everything worked again. And I don't 
>>>>>>> > know how to reproduce it.
>>>>>>> >
>>>>>>> > The attachment is my TM log. Because there are many user logs and 
>>>>>>> > sensitive information, I only remain the log from 
>>>>>>> > `org.apache.flink...`.
>>>>>>> >
>>>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>>>> >
>>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max 
>>>>>>> > concurrent number is 3.
>>>>>>> >
>>>>>>> > Please let me know if it needs more information to find out what 
>>>>>>> > happened on my streaming job. Thanks for your help.
>>>>>>> >
>>>>>>> > Best Regards,
>>>>>>> > Tony Wei
>>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
>> 
>>> <chk_ 1577.log>
>> 
>> 
>> <threaddumps.log>
> 
> 

Reply via email to