Hi Stefan, Thank you very much. I will try to investigate what's the problem on my cluster and S3. BTW, Is there any Jira issue associated with your improvement, so that I can track it?
Best Regards, Tony Wei 2017-10-03 16:01 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>: > Hi, > > from the stack trace, it seems to me like closing the checkpoint output > stream to S3 is the culprit: > > "pool-55-thread-7" #458829 prio=5 os_prio=0 tid=0x00007fda180c4000 > nid=0x55a2 waiting on condition [0x00007fda092d7000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007154050b8> (a java.util.concurrent. > FutureTask) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > * at > com.amazonaws.services.s3.transfer.internal.UploadImpl.waitForUploadResult(UploadImpl.java:66)* > * at > org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:131)* > - locked <0x00000007154801d0> (a org.apache.hadoop.fs.s3a.S3AOutputStream) > at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close( > FSDataOutputStream.java:72) > at org.apache.hadoop.fs.FSDataOutputStream.close( > FSDataOutputStream.java:106) > at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream. > close(HadoopDataOutputStream.java:48) > at org.apache.flink.core.fs.ClosingFSDataOutputStream.close( > ClosingFSDataOutputStream.java:64) > at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$ > FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory. > java:319) > - locked <0x0000000715480238> (a org.apache.flink.runtime. > state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$ > RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandl > e(RocksDBKeyedStateBackend.java:693) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$ > RocksDBFullSnapshotOperation.closeCheckpointStream( > RocksDBKeyedStateBackend.java:531) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3. > performOperation(RocksDBKeyedStateBackend.java:420) > - locked <0x000000073ef55b00> (a org.apache.flink.runtime.util. > SerializableObject) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3. > performOperation(RocksDBKeyedStateBackend.java:399) > at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable. > call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRunnable.run(StreamTask.java:897) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > In particular, this holds lock 0x000000073ef55b00, which blocks the next > checkpoint in it’s synchronous phase: > > "count-with-timeout-window -> s3-uploader -> Sink: meta-store-committer > (7/12)" #454093 daemon prio=5 os_prio=0 tid=0x00007fda28040000 nid=0x2f3b > waiting for monitor entry [0x00007fda0a5e8000] > java.lang.Thread.State: BLOCKED (on object monitor) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend. > snapshotFully(RocksDBKeyedStateBackend.java:379) > *- waiting to lock <0x000000073ef55b00> (a > org.apache.flink.runtime.util.SerializableObject)* > * at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:317)* > * at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)* > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > CheckpointingOperation.executeCheckpointing(StreamTask.java:1094) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > checkpointState(StreamTask.java:654) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > performCheckpoint(StreamTask.java:590) > - locked <0x000000073ee55068> (a java.lang.Object) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > triggerCheckpointOnBarrier(StreamTask.java:543) > at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint( > BarrierBuffer.java:378) > at org.apache.flink.streaming.runtime.io.BarrierBuffer. > processBarrier(BarrierBuffer.java:281) > at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked( > BarrierBuffer.java:183) > * at org.apache.flink.streaming.runtime.io > <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:213)* > * at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)* > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > > This, in turn, blocks the operators main processing loop (I marked it > further down the trace) and processing stops. > > So while I assume that something bad happens with your S3, it is also not > nice that this brings down the pipeline in such ways. > > I had actually already created a branch that aims to improve (=reduce) the > whole locking in the RockDBKeyedStateBackend to prevent exactly such > scenarios: > Right now, the whole purpose of this lock is protecting the RocksDB > instance from getting disposed while concurrent operations, such as > checkpoints, are still running. Protecting the RocksDB instance is > important because it is a native library and accessing a disposed instance > will cause segfaults. > However, it is actually not required to hold on to the lock all the time. > The idea behind my change is to have a synchronized client counter to track > all ongoing workers that use the RocksDB instance, so only incrementing, > decrementing, and checking the counter happens under the lock. Disposing > the RocksDB instance can then only start when the „client count“ is zero, > and after it started, no new clients can register. So it is similar to > reader/writer locking, where all ops on the DB are „reader" and disposing > the instance is the „writer". > > I am currently on holidays, maybe this small change is quiet useful and I > will prioritize it a bit when I am back. Nevertheless, I suggest to > investigate why S3 is behaving like this. > > Best, > Stefan > > > > Am 03.10.2017 um 07:26 schrieb Tony Wei <tony19920...@gmail.com>: > > Hi Stefan, > > It seems that the similar situation, in which job blocked after checkpoint > timeout, came across to my job. BTW, this is another job that I raised > parallelism and throughput of input. > > After chk #8 started, the whole operator seems blocked. > > I recorded some JM / TM logs, snapshots and thread dump logs, which the > attachment is. Hope these will help to find the root cause. Thank you. > > Best Regards, > Tony Wie > > ============================================================ > ============================================================ > ================================== > > JM log: > > 2017-10-03 03:46:49,371 WARN > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Received late message for now expired checkpoint attempt 7 from > b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72. > 2017-10-03 03:47:00,977 WARN > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Received late message for now expired checkpoint attempt 8 from > b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72. > > TM log: > > 2017-10-03 03:46:46,962 INFO org.apache.flink.contrib.streaming.state. > RocksDBKeyedStateBackend - Asynchronous RocksDB snapshot (File Stream > Factory @ s3://tony-dev/flink-checkpoints/ecfa5968e831e547ed70d1359a615f72, > asynchronous part) in thread Thread[pool-55-thread-7,5,Flink Task > Threads] took 1211517 ms. > > Snapshots: > > <???? 2017-10-03 下午12.20.11.png> > <???? 2017-10-03 下午12.22.10.png> > > 2017-09-28 20:29 GMT+08:00 Tony Wei <tony19920...@gmail.com>: > >> Hi Stefan, >> >> That reason makes sense to me. Thanks for point me out. >> >> About my job, the database currently was never used, I disabled it for >> some reasons, but output to s3 was implemented by async io. >> >> I used ForkJoinPool with 50 capacity. >> I have tried to rebalance after count window to monitor the back pressure >> on upload operator. >> The result is always OK status. >> I think the reason is due to that count window buffered lots of records, >> so the input rate in upload operator was not too high. >> >> But I am not sure that if the setup for my capacity of ForkJoinPool would >> impact the process asynchronous checkpoints both machine's resources and s3 >> connection. >> >> BTW, s3 serves both operator and checkpointing and I used aws java api to >> access s3 in upload operator in order to control where the files go. >> >> Best Regards, >> Tony Wei >> >> Stefan Richter <s.rich...@data-artisans.com>於 2017年9月28日 週四,下午7:43寫道: >> >>> Hi, >>> >>> the gap between the sync and the async part does not mean too much. What >>> happens per task is that all operators go through their sync part, and then >>> one thread executes all the async parts, one after the other. So if an >>> async part starts late, this is just because it started only after another >>> async part finished. >>> >>> I have one more question about your job,because it involves >>> communication with external systems, like S3 and a database. Are you sure >>> that they cannot sometimes become a bottleneck, block, and bring down your >>> job. in particular: is the same S3 used to serve the operator and >>> checkpointing and what is your sustained read/write rate there and the >>> maximum number of connections? You can try to use the backpressure metric >>> and try to identify the first operator (counting from the sink) that >>> indicates backpressure. >>> >>> Best, >>> Stefan >>> >>> Am 28.09.2017 um 12:59 schrieb Tony Wei <tony19920...@gmail.com>: >>> >>> Hi, >>> >>> Sorry. This is the correct one. >>> >>> Best Regards, >>> Tony Wei >>> >>> 2017-09-28 18:55 GMT+08:00 Tony Wei <tony19920...@gmail.com>: >>> >>>> Hi Stefan, >>>> >>>> Sorry for providing partial information. The attachment is the full >>>> logs for checkpoint #1577. >>>> >>>> Why I would say it seems that asynchronous part was not executed >>>> immediately is due to all synchronous parts were all finished at 2017-09-27 >>>> 13:49. >>>> Did that mean the checkpoint barrier event had already arrived at the >>>> operator and started as soon as when the JM triggered the checkpoint? >>>> >>>> Best Regards, >>>> Tony Wei >>>> >>>> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com> >>>> : >>>> >>>>> Hi, >>>>> >>>>> I agree that the memory consumption looks good. If there is only one >>>>> TM, it will run inside one JVM. As for the 7 minutes, you mean the >>>>> reported >>>>> end-to-end time? This time measurement starts when the checkpoint is >>>>> triggered on the job manager, the first contributor is then the time that >>>>> it takes for the checkpoint barrier event to travel with the stream to the >>>>> operators. If there is back pressure and a lot of events are buffered, >>>>> this >>>>> can introduce delay to this first part, because barriers must not overtake >>>>> data for correctness. After the barrier arrives at the operator, next >>>>> comes >>>>> the synchronous part of the checkpoint, which is typically short running >>>>> and takes a snapshot of the state (think of creating an immutable version, >>>>> e.g. through copy on write). In the asynchronous part, this snapshot is >>>>> persisted to DFS. After that the timing stops and is reported together >>>>> with >>>>> the acknowledgement to the job manager. >>>>> >>>>> So, I would assume if reporting took 7 minutes end-to-end, and the >>>>> async part took 4 minutes, it is likely that it took around 3 minutes for >>>>> the barrier event to travel with the stream. About the debugging, I think >>>>> it is hard to figure out what is going on with the DFS if you don’t have >>>>> metrics on that. Maybe you could attach a sampler to the TM’s jvm and >>>>> monitor where time is spend for the snapshotting? >>>>> >>>>> I am also looping in Stephan, he might have more suggestions. >>>>> >>>>> Best, >>>>> Stefan >>>>> >>>>> Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920...@gmail.com>: >>>>> >>>>> Hi Stefan, >>>>> >>>>> These are some telemetry information, but I don't have history >>>>> information about gc. >>>>> >>>>> <???? 2017-09-2 8 下午4.51.26.png> >>>>> <???? 2017-09-2 8 下午4.51.11.png> >>>>> >>>>> 1) Yes, my state is not large. >>>>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. >>>>> Since this is a POC, we might move to AWS in the future or use HDFS in the >>>>> same cluster. However, how can I recognize the problem is this. >>>>> 3) It seems memory usage is bounded. I'm not sure if the status showed >>>>> above is fine. >>>>> >>>>> There is only one TM in my cluster for now, so all tasks are running >>>>> on that machine. I think that means they are in the same JVM, right? >>>>> Besides taking so long on asynchronous part, there is another question >>>>> is that the late message showed that this task was delay for almost 7 >>>>> minutes, but the log showed it only took 4 minutes. >>>>> It seems that it was somehow waiting for being executed. Are there >>>>> some points to find out what happened? >>>>> >>>>> For the log information, what I means is it is hard to recognize which >>>>> checkpoint id that asynchronous parts belong to if the checkpoint takes >>>>> more time and there are more concurrent checkpoints taking place. >>>>> Also, it seems that asynchronous part might be executed right away if >>>>> there is no resource from thread pool. It is better to measure the time >>>>> between creation time and processing time, and log it and checkpoint id >>>>> with the original log that showed what time the asynchronous part took. >>>>> >>>>> Best Regards, >>>>> Tony Wei >>>>> >>>>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com >>>>> >: >>>>> >>>>>> Hi, >>>>>> >>>>>> when the async part takes that long I would have 3 things to look at: >>>>>> >>>>>> 1) Is your state so large? I don’t think this applies in your case, >>>>>> right? >>>>>> 2) Is something wrong with writing to DFS (network, disks, etc)? >>>>>> 3) Are we running low on memory on that task manager? >>>>>> >>>>>> Do you have telemetry information about used heap and gc pressure on >>>>>> the problematic task? However, what speaks against the memory problem >>>>>> hypothesis is that future checkpoints seem to go through again. What I >>>>>> find >>>>>> very strange is that within the reported 4 minutes of the async part the >>>>>> only thing that happens is: open dfs output stream, iterate the in-memory >>>>>> state and write serialized state data to dfs stream, then close the >>>>>> stream. >>>>>> No locks or waits in that section, so I would assume that for one of the >>>>>> three reasons I gave, writing the state is terribly slow. >>>>>> >>>>>> Those snapshots should be able to run concurrently, for example so >>>>>> that users can also take savepoints even when a checkpoint was triggered >>>>>> and is still running, so there is no way to guarantee that the previous >>>>>> parts have finished, this is expected behaviour. Which waiting times are >>>>>> you missing in the log? I think the information about when a checkpoint >>>>>> is >>>>>> triggered, received by the TM, performing the sync and async part and >>>>>> acknowledgement time should all be there?. >>>>>> >>>>>> Best, >>>>>> Stefan >>>>>> >>>>>> >>>>>> >>>>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920...@gmail.com>: >>>>>> >>>>>> Hi Stefan, >>>>>> >>>>>> The checkpoint on my job has been subsumed again. There are some >>>>>> questions that I don't understand. >>>>>> >>>>>> Log in JM : >>>>>> 2017-09-27 13:45:15,686 INFO >>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>> - Completed checkpoint 1576 (174693180 bytes in 21597 ms). >>>>>> 2017-09-27 13:49:42,795 INFO >>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>> - Triggering checkpoint 1577 @ 1506520182795 >>>>>> 2017-09-27 13:54:42,795 INFO >>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>> - Triggering checkpoint 1578 @ 1506520482795 >>>>>> 2017-09-27 13:55:13,105 INFO >>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>> - Completed checkpoint 1578 (152621410 bytes in 19109 ms). >>>>>> 2017-09-27 13:56:37,103 WARN >>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>> - Received late message for now expired checkpoint attempt 1577 from >>>>>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b.... >>>>>> 2017-09-27 13:59:42,795 INFO >>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>> - Triggering checkpoint 1579 @ 1506520782795 >>>>>> >>>>>> Log in TM: >>>>>> 2017-09-27 13:56:37,105 INFO >>>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend >>>>>> - DefaultOperatorStateBackend snapshot (File Stream Factory @ >>>>>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2, >>>>>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task >>>>>> Threads] took 240248 ms. >>>>>> >>>>>> I think the log in TM might be the late message for #1577 in JM, >>>>>> because #1576, #1578 had been finished and #1579 hadn't been started at >>>>>> 13:56:37. >>>>>> If there is no mistake on my words, I am wondering why the time it >>>>>> took was 240248 ms (4 min). It seems that it started late than >>>>>> asynchronous >>>>>> tasks in #1578. >>>>>> Is there any way to guarantee the previous asynchronous parts of >>>>>> checkpoints will be executed before the following. >>>>>> >>>>>> Moreover, I think it will be better to have more information in INFO >>>>>> log, such as waiting time and checkpoint id, in order to trace the >>>>>> progress >>>>>> of checkpoint conveniently. >>>>>> >>>>>> What do you think? Do you have any suggestion for me to deal with >>>>>> these problems? Thank you. >>>>>> >>>>>> Best Regards, >>>>>> Tony Wei >>>>>> >>>>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920...@gmail.com>: >>>>>> >>>>>>> Hi Stefan, >>>>>>> >>>>>>> Here is the summary for my streaming job's checkpoint after >>>>>>> restarting at last night. >>>>>>> >>>>>>> <???? 2017-09-2 7 下午4.56.30.png> >>>>>>> >>>>>>> This is the distribution of alignment buffered from the last 12 >>>>>>> hours. >>>>>>> >>>>>>> <???? 2017-09-2 7 下午5.05.11.png> >>>>>>> >>>>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For >>>>>>> chk #1245 and #1246, you can check the picture I sent before. >>>>>>> >>>>>>> <???? 2017-09-2 7 下午5.01.24.png> >>>>>>> >>>>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes >>>>>>> goes up to HIGH, and always OK during the night. >>>>>>> >>>>>>> Best Regards, >>>>>>> Tony Wei >>>>>>> >>>>>>> >>>>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter < >>>>>>> s.rich...@data-artisans.com>: >>>>>>> >>>>>>>> Hi Tony, >>>>>>>> >>>>>>>> are your checkpoints typically close to the timeout boundary? From >>>>>>>> what I see, writing the checkpoint is relatively fast but the time >>>>>>>> from the >>>>>>>> checkpoint trigger to execution seems very long. This is typically the >>>>>>>> case >>>>>>>> if your job has a lot of backpressure and therefore the checkpoint >>>>>>>> barriers >>>>>>>> take a long time to travel to the operators, because a lot of events >>>>>>>> are >>>>>>>> piling up in the buffers. Do you also experience large alignments for >>>>>>>> your >>>>>>>> checkpoints? >>>>>>>> >>>>>>>> Best, >>>>>>>> Stefan >>>>>>>> >>>>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920...@gmail.com>: >>>>>>>> >>>>>>>> Hi Stefan, >>>>>>>> >>>>>>>> It seems that I found something strange from JM's log. >>>>>>>> >>>>>>>> It had happened more than once before, but all subtasks would >>>>>>>> finish their checkpoint attempts in the end. >>>>>>>> >>>>>>>> 2017-09-26 01:23:28,690 INFO >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Triggering checkpoint 1140 @ 1506389008690 >>>>>>>> 2017-09-26 01:28:28,690 INFO >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Triggering checkpoint 1141 @ 1506389308690 >>>>>>>> 2017-09-26 01:33:28,690 INFO >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Triggering checkpoint 1142 @ 1506389608690 >>>>>>>> 2017-09-26 01:33:28,691 INFO >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Checkpoint 1140 expired before completing. >>>>>>>> 2017-09-26 01:38:28,691 INFO >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Checkpoint 1141 expired before completing. >>>>>>>> 2017-09-26 01:40:38,044 WARN >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Received late message for now expired checkpoint attempt 1140 from >>>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b... >>>>>>>> 2017-09-26 01:40:53,743 WARN >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Received late message for now expired checkpoint attempt 1141 from >>>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b... >>>>>>>> 2017-09-26 01:41:19,332 INFO >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms). >>>>>>>> >>>>>>>> For chk #1245 and #1246, there was no late message from TM. You can >>>>>>>> refer to the TM log. The full completed checkpoint attempt will have 12 >>>>>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got >>>>>>>> 10 >>>>>>>> logs. >>>>>>>> >>>>>>>> 2017-09-26 10:08:28,690 INFO >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Triggering checkpoint 1245 @ 1506420508690 >>>>>>>> 2017-09-26 10:13:28,690 INFO >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Triggering checkpoint 1246 @ 1506420808690 >>>>>>>> 2017-09-26 10:18:28,691 INFO >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Checkpoint 1245 expired before completing. >>>>>>>> 2017-09-26 10:23:28,691 INFO >>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>>>> - Checkpoint 1246 expired before completing. >>>>>>>> >>>>>>>> Moreover, I listed the directory for checkpoints on S3 and saw >>>>>>>> there were two states not discarded successfully. In general, there >>>>>>>> will be >>>>>>>> 16 parts for a completed checkpoint state. >>>>>>>> >>>>>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c0 >>>>>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5- >>>>>>>> bf0b-11cc1fc67ab8 >>>>>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c0 >>>>>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d- >>>>>>>> 8509-5fea4ed25af6 >>>>>>>> >>>>>>>> Hope these informations are helpful. Thank you. >>>>>>>> >>>>>>>> Best Regards, >>>>>>>> Tony Wei >>>>>>>> >>>>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter < >>>>>>>> s.rich...@data-artisans.com>: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> thanks for the information. Unfortunately, I have no immediate >>>>>>>>> idea what the reason is from the given information. I think most >>>>>>>>> helpful >>>>>>>>> could be a thread dump, but also metrics on the operator operator >>>>>>>>> level to >>>>>>>>> figure out which part of the pipeline is the culprit. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Stefan >>>>>>>>> >>>>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920...@gmail.com>: >>>>>>>>> >>>>>>>>> Hi Stefan, >>>>>>>>> >>>>>>>>> There is no unknown exception in my full log. The Flink version is >>>>>>>>> 1.3.2. >>>>>>>>> My job is roughly like this. >>>>>>>>> >>>>>>>>> env.addSource(Kafka) >>>>>>>>> .map(ParseKeyFromRecord) >>>>>>>>> .keyBy() >>>>>>>>> .process(CountAndTimeoutWindow) >>>>>>>>> .asyncIO(UploadToS3) >>>>>>>>> .addSink(UpdateDatabase) >>>>>>>>> >>>>>>>>> It seemed all tasks stopped like the picture I sent in the last >>>>>>>>> email. >>>>>>>>> >>>>>>>>> I will keep my eye on taking a thread dump from that JVM if this >>>>>>>>> happens again. >>>>>>>>> >>>>>>>>> Best Regards, >>>>>>>>> Tony Wei >>>>>>>>> >>>>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter < >>>>>>>>> s.rich...@data-artisans.com>: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> that is very strange indeed. I had a look at the logs and there >>>>>>>>>> is no error or exception reported. I assume there is also no >>>>>>>>>> exception in >>>>>>>>>> your full logs? Which version of flink are you using and what >>>>>>>>>> operators >>>>>>>>>> were running in the task that stopped? If this happens again, would >>>>>>>>>> it be >>>>>>>>>> possible to take a thread dump from that JVM? >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Stefan >>>>>>>>>> >>>>>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920...@gmail.com >>>>>>>>>> >: >>>>>>>>>> > >>>>>>>>>> > Hi, >>>>>>>>>> > >>>>>>>>>> > Something weird happened on my streaming job. >>>>>>>>>> > >>>>>>>>>> > I found my streaming job seems to be blocked for a long time >>>>>>>>>> and I saw the situation like the picture below. (chk #1245 and #1246 >>>>>>>>>> were >>>>>>>>>> all finishing 7/8 tasks then marked timeout by JM. Other checkpoints >>>>>>>>>> failed >>>>>>>>>> with the same state like #1247 util I restarted TM.) >>>>>>>>>> > >>>>>>>>>> > <snapshot.png> >>>>>>>>>> > >>>>>>>>>> > I'm not sure what happened, but the consumer stopped fetching >>>>>>>>>> records, buffer usage is 100% and the following task did not seem to >>>>>>>>>> fetch >>>>>>>>>> data anymore. Just like the whole TM was stopped. >>>>>>>>>> > >>>>>>>>>> > However, after I restarted TM and force the job restarting from >>>>>>>>>> the latest completed checkpoint, everything worked again. And I >>>>>>>>>> don't know >>>>>>>>>> how to reproduce it. >>>>>>>>>> > >>>>>>>>>> > The attachment is my TM log. Because there are many user logs >>>>>>>>>> and sensitive information, I only remain the log from >>>>>>>>>> `org.apache.flink...`. >>>>>>>>>> > >>>>>>>>>> > My cluster setting is one JM and one TM with 4 available slots. >>>>>>>>>> > >>>>>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and >>>>>>>>>> max concurrent number is 3. >>>>>>>>>> > >>>>>>>>>> > Please let me know if it needs more information to find out >>>>>>>>>> what happened on my streaming job. Thanks for your help. >>>>>>>>>> > >>>>>>>>>> > Best Regards, >>>>>>>>>> > Tony Wei >>>>>>>>>> > <flink-root-taskmanager-0-partial.log> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> <chk_ 1577.log> >>> >>> >>> > <threaddumps.log> > > >