Hi Stefan,

Thank you very much. I will try to investigate what's the problem on my
cluster and S3.
BTW, Is there any Jira issue associated with your improvement, so that I
can track it?

Best Regards,
Tony Wei

2017-10-03 16:01 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:

> Hi,
>
> from the stack trace, it seems to me like closing the checkpoint output
> stream to S3 is the culprit:
>
> "pool-55-thread-7" #458829 prio=5 os_prio=0 tid=0x00007fda180c4000
> nid=0x55a2 waiting on condition [0x00007fda092d7000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000007154050b8> (a java.util.concurrent.
> FutureTask)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
> at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> * at
> com.amazonaws.services.s3.transfer.internal.UploadImpl.waitForUploadResult(UploadImpl.java:66)*
> * at
> org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:131)*
> - locked <0x00000007154801d0> (a org.apache.hadoop.fs.s3a.S3AOutputStream)
> at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(
> FSDataOutputStream.java:72)
> at org.apache.hadoop.fs.FSDataOutputStream.close(
> FSDataOutputStream.java:106)
> at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.
> close(HadoopDataOutputStream.java:48)
> at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(
> ClosingFSDataOutputStream.java:64)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$
> FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.
> java:319)
> - locked <0x0000000715480238> (a org.apache.flink.runtime.
> state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandl
> e(RocksDBKeyedStateBackend.java:693)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullSnapshotOperation.closeCheckpointStream(
> RocksDBKeyedStateBackend.java:531)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.
> performOperation(RocksDBKeyedStateBackend.java:420)
> - locked <0x000000073ef55b00> (a org.apache.flink.runtime.util.
> SerializableObject)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.
> performOperation(RocksDBKeyedStateBackend.java:399)
> at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.
> call(AbstractAsyncIOCallable.java:72)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:897)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> In particular, this holds lock 0x000000073ef55b00, which blocks the next
> checkpoint in it’s synchronous phase:
>
> "count-with-timeout-window -> s3-uploader -> Sink: meta-store-committer
> (7/12)" #454093 daemon prio=5 os_prio=0 tid=0x00007fda28040000 nid=0x2f3b
> waiting for monitor entry [0x00007fda0a5e8000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.
> snapshotFully(RocksDBKeyedStateBackend.java:379)
> *- waiting to lock <0x000000073ef55b00> (a
> org.apache.flink.runtime.util.SerializableObject)*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:317)*
> * at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)*
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> checkpointState(StreamTask.java:654)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCheckpoint(StreamTask.java:590)
> - locked <0x000000073ee55068> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> triggerCheckpointOnBarrier(StreamTask.java:543)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(
> BarrierBuffer.java:378)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> processBarrier(BarrierBuffer.java:281)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(
> BarrierBuffer.java:183)
> * at org.apache.flink.streaming.runtime.io
> <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:213)*
> * at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)*
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
> This, in turn, blocks the operators main processing loop (I marked it
> further down the trace) and processing stops.
>
> So while I assume that something bad happens with your S3, it is also not
> nice that this brings down the pipeline in such ways.
>
> I had actually already created a branch that aims to improve (=reduce) the
> whole locking in the RockDBKeyedStateBackend to prevent exactly such
> scenarios:
> Right now, the whole purpose of this lock is protecting the RocksDB
> instance from getting disposed while concurrent operations, such as
> checkpoints, are still running. Protecting the RocksDB instance is
> important because it is a native library and accessing a disposed instance
> will cause segfaults.
> However, it is actually not required to hold on to the lock all the time.
> The idea behind my change is to have a synchronized client counter to track
> all ongoing workers that use the RocksDB instance, so only incrementing,
> decrementing, and checking the counter happens under the lock. Disposing
> the RocksDB instance can then only start when the „client count“ is zero,
> and after it started, no new clients can register. So it is similar to
> reader/writer locking, where all ops on the DB are „reader" and disposing
> the instance is the „writer".
>
> I am currently on holidays, maybe this small change is quiet useful and I
> will prioritize it a bit when I am back. Nevertheless, I suggest to
> investigate why S3 is behaving like this.
>
> Best,
> Stefan
>
>
>
> Am 03.10.2017 um 07:26 schrieb Tony Wei <tony19920...@gmail.com>:
>
> Hi Stefan,
>
> It seems that the similar situation, in which job blocked after checkpoint
> timeout, came across to my job. BTW, this is another job that I raised
> parallelism and throughput of input.
>
> After chk #8 started, the whole operator seems blocked.
>
> I recorded some JM / TM logs, snapshots and thread dump logs, which the
> attachment is. Hope these will help to find the root cause. Thank you.
>
> Best Regards,
> Tony Wie
>
> ============================================================
> ============================================================
> ==================================
>
> JM log:
>
> 2017-10-03 03:46:49,371 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>     - Received late message for now expired checkpoint attempt 7 from
> b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
> 2017-10-03 03:47:00,977 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>     - Received late message for now expired checkpoint attempt 8 from
> b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
>
> TM log:
>
> 2017-10-03 03:46:46,962 INFO  org.apache.flink.contrib.streaming.state.
> RocksDBKeyedStateBackend  - Asynchronous RocksDB snapshot (File Stream
> Factory @ s3://tony-dev/flink-checkpoints/ecfa5968e831e547ed70d1359a615f72,
> asynchronous part) in thread Thread[pool-55-thread-7,5,Flink Task
> Threads] took 1211517 ms.
>
> Snapshots:
>
> <???? 2017-10-03  下午12.20.11.png>
> <???? 2017-10-03  下午12.22.10.png>
>
> 2017-09-28 20:29 GMT+08:00 Tony Wei <tony19920...@gmail.com>:
>
>> Hi Stefan,
>>
>> That reason makes sense to me. Thanks for point me out.
>>
>> About my job, the database currently was never used, I disabled it for
>> some reasons, but output to s3 was implemented by async io.
>>
>> I used ForkJoinPool with 50 capacity.
>> I have tried to rebalance after count window to monitor the back pressure
>> on upload operator.
>> The result is always OK status.
>> I think the reason is due to that count window buffered lots of records,
>> so the input rate in upload operator was not too high.
>>
>> But I am not sure that if the setup for my capacity of ForkJoinPool would
>> impact the process asynchronous checkpoints both machine's resources and s3
>> connection.
>>
>> BTW, s3 serves both operator and checkpointing and I used aws java api to
>> access s3 in upload operator in order to control where the files go.
>>
>> Best Regards,
>> Tony Wei
>>
>> Stefan Richter <s.rich...@data-artisans.com>於 2017年9月28日 週四,下午7:43寫道:
>>
>>> Hi,
>>>
>>> the gap between the sync and the async part does not mean too much. What
>>> happens per task is that all operators go through their sync part, and then
>>> one thread executes all the async parts, one after the other. So if an
>>> async part starts late, this is just because it started only after another
>>> async part finished.
>>>
>>> I have one more question about your job,because it involves
>>> communication with external systems, like S3 and a database. Are you sure
>>> that they cannot sometimes become a bottleneck, block, and bring down your
>>> job. in particular: is the same S3 used to serve the operator and
>>> checkpointing and what is your sustained read/write rate there and the
>>> maximum number of connections? You can try to use the backpressure metric
>>> and try to identify the first operator (counting from the sink) that
>>> indicates backpressure.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 28.09.2017 um 12:59 schrieb Tony Wei <tony19920...@gmail.com>:
>>>
>>> Hi,
>>>
>>> Sorry. This is the correct one.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-09-28 18:55 GMT+08:00 Tony Wei <tony19920...@gmail.com>:
>>>
>>>> Hi Stefan,
>>>>
>>>> Sorry for providing partial information. The attachment is the full
>>>> logs for checkpoint #1577.
>>>>
>>>> Why I would say it seems that asynchronous part was not executed
>>>> immediately is due to all synchronous parts were all finished at 2017-09-27
>>>> 13:49.
>>>> Did that mean the checkpoint barrier event had already arrived at the
>>>> operator and started as soon as when the JM triggered the checkpoint?
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>
>>>> :
>>>>
>>>>> Hi,
>>>>>
>>>>> I agree that the memory consumption looks good. If there is only one
>>>>> TM, it will run inside one JVM. As for the 7 minutes, you mean the 
>>>>> reported
>>>>> end-to-end time? This time measurement starts when the checkpoint is
>>>>> triggered on the job manager, the first contributor is then the time that
>>>>> it takes for the checkpoint barrier event to travel with the stream to the
>>>>> operators. If there is back pressure and a lot of events are buffered, 
>>>>> this
>>>>> can introduce delay to this first part, because barriers must not overtake
>>>>> data for correctness. After the barrier arrives at the operator, next 
>>>>> comes
>>>>> the synchronous part of the checkpoint, which is typically short running
>>>>> and takes a snapshot of the state (think of creating an immutable version,
>>>>> e.g. through copy on write). In the asynchronous part, this snapshot is
>>>>> persisted to DFS. After that the timing stops and is reported together 
>>>>> with
>>>>> the acknowledgement to the job manager.
>>>>>
>>>>> So, I would assume if reporting took 7 minutes end-to-end, and the
>>>>> async part took 4 minutes, it is likely that it took around 3 minutes for
>>>>> the barrier event to travel with the stream. About the debugging, I think
>>>>> it is hard to figure out what is going on with the DFS if you don’t have
>>>>> metrics on that. Maybe you could attach a sampler to the TM’s jvm and
>>>>> monitor where time is spend for the snapshotting?
>>>>>
>>>>> I am also looping in Stephan, he might have more suggestions.
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>> Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920...@gmail.com>:
>>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> These are some telemetry information, but I don't have history
>>>>> information about gc.
>>>>>
>>>>> <???? 2017-09-2 8 下午4.51.26.png>
>>>>> <???? 2017-09-2 8 下午4.51.11.png>
>>>>>
>>>>> 1) Yes, my state is not large.
>>>>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
>>>>> Since this is a POC, we might move to AWS in the future or use HDFS in the
>>>>> same cluster. However, how can I recognize the problem is this.
>>>>> 3) It seems memory usage is bounded. I'm not sure if the status showed
>>>>> above is fine.
>>>>>
>>>>> There is only one TM in my cluster for now, so all tasks are running
>>>>> on that machine. I think that means they are in the same JVM, right?
>>>>> Besides taking so long on asynchronous part, there is another question
>>>>> is that the late message showed that this task was delay for almost 7
>>>>> minutes, but the log showed it only took 4 minutes.
>>>>> It seems that it was somehow waiting for being executed. Are there
>>>>> some points to find out what happened?
>>>>>
>>>>> For the log information, what I means is it is hard to recognize which
>>>>> checkpoint id that asynchronous parts belong to if the checkpoint takes
>>>>> more time and there are more concurrent checkpoints taking place.
>>>>> Also, it seems that asynchronous part might be executed right away if
>>>>> there is no resource from thread pool. It is better to measure the time
>>>>> between creation time and processing time, and log it and checkpoint id
>>>>> with the original log that showed what time the asynchronous part took.
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> when the async part takes that long I would have 3 things to look at:
>>>>>>
>>>>>> 1) Is your state so large? I don’t think this applies in your case,
>>>>>> right?
>>>>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>>>>> 3) Are we running low on memory on that task manager?
>>>>>>
>>>>>> Do you have telemetry information about used heap and gc pressure on
>>>>>> the problematic task? However, what speaks against the memory problem
>>>>>> hypothesis is that future checkpoints seem to go through again. What I 
>>>>>> find
>>>>>> very strange is that within the reported 4 minutes of the async part the
>>>>>> only thing that happens is: open dfs output stream, iterate the in-memory
>>>>>> state and write serialized state data to dfs stream, then close the 
>>>>>> stream.
>>>>>> No locks or waits in that section, so I would assume that for one of the
>>>>>> three reasons I gave, writing the state is terribly slow.
>>>>>>
>>>>>> Those snapshots should be able to run concurrently, for example so
>>>>>> that users can also take savepoints  even when a checkpoint was triggered
>>>>>> and is still running, so there is no way to guarantee that the previous
>>>>>> parts have finished, this is expected behaviour. Which waiting times are
>>>>>> you missing in the log? I think the information about when a checkpoint 
>>>>>> is
>>>>>> triggered, received by the TM, performing the sync and async part and
>>>>>> acknowledgement time should all be there?.
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920...@gmail.com>:
>>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>> The checkpoint on my job has been subsumed again. There are some
>>>>>> questions that I don't understand.
>>>>>>
>>>>>> Log in JM :
>>>>>> 2017-09-27 13:45:15,686 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>>>>>> 2017-09-27 13:49:42,795 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Triggering checkpoint 1577 @ 1506520182795
>>>>>> 2017-09-27 13:54:42,795 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Triggering checkpoint 1578 @ 1506520482795
>>>>>> 2017-09-27 13:55:13,105 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>>>>>> 2017-09-27 13:56:37,103 WARN 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Received late message for now expired checkpoint attempt 1577 from
>>>>>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>>>>> 2017-09-27 13:59:42,795 INFO 
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Triggering checkpoint 1579 @ 1506520782795
>>>>>>
>>>>>> Log in TM:
>>>>>> 2017-09-27 13:56:37,105 INFO 
>>>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend
>>>>>> - DefaultOperatorStateBackend snapshot (File Stream Factory @
>>>>>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
>>>>>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task
>>>>>> Threads] took 240248 ms.
>>>>>>
>>>>>> I think the log in TM might be the late message for #1577 in JM,
>>>>>> because #1576, #1578 had been finished and #1579 hadn't been started at
>>>>>> 13:56:37.
>>>>>> If there is no mistake on my words, I am wondering why the time it
>>>>>> took was 240248 ms (4 min). It seems that it started late than 
>>>>>> asynchronous
>>>>>> tasks in #1578.
>>>>>> Is there any way to guarantee the previous asynchronous parts of
>>>>>> checkpoints will be executed before the following.
>>>>>>
>>>>>> Moreover, I think it will be better to have more information in INFO
>>>>>> log, such as waiting time and checkpoint id, in order to trace the 
>>>>>> progress
>>>>>> of checkpoint conveniently.
>>>>>>
>>>>>> What do you think? Do you have any suggestion for me to deal with
>>>>>> these problems? Thank you.
>>>>>>
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920...@gmail.com>:
>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>> Here is the summary for my streaming job's checkpoint after
>>>>>>> restarting at last night.
>>>>>>>
>>>>>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>>>>>
>>>>>>> This is the distribution of alignment buffered from the last 12
>>>>>>> hours.
>>>>>>>
>>>>>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>>>>>
>>>>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For
>>>>>>> chk #1245 and #1246, you can check the picture I sent before.
>>>>>>>
>>>>>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>>>>>
>>>>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes
>>>>>>> goes up to HIGH, and always OK during the night.
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>>
>>>>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <
>>>>>>> s.rich...@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi Tony,
>>>>>>>>
>>>>>>>> are your checkpoints typically close to the timeout boundary? From
>>>>>>>> what I see, writing the checkpoint is relatively fast but the time 
>>>>>>>> from the
>>>>>>>> checkpoint trigger to execution seems very long. This is typically the 
>>>>>>>> case
>>>>>>>> if your job has a lot of backpressure and therefore the checkpoint 
>>>>>>>> barriers
>>>>>>>> take a long time to travel to the operators, because a lot of events 
>>>>>>>> are
>>>>>>>> piling up in the buffers. Do you also experience large alignments for 
>>>>>>>> your
>>>>>>>> checkpoints?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Stefan
>>>>>>>>
>>>>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920...@gmail.com>:
>>>>>>>>
>>>>>>>> Hi Stefan,
>>>>>>>>
>>>>>>>> It seems that I found something strange from JM's log.
>>>>>>>>
>>>>>>>> It had happened more than once before, but all subtasks would
>>>>>>>> finish their checkpoint attempts in the end.
>>>>>>>>
>>>>>>>> 2017-09-26 01:23:28,690 INFO 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Triggering checkpoint 1140 @ 1506389008690
>>>>>>>> 2017-09-26 01:28:28,690 INFO 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Triggering checkpoint 1141 @ 1506389308690
>>>>>>>> 2017-09-26 01:33:28,690 INFO 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Triggering checkpoint 1142 @ 1506389608690
>>>>>>>> 2017-09-26 01:33:28,691 INFO 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Checkpoint 1140 expired before completing.
>>>>>>>> 2017-09-26 01:38:28,691 INFO 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Checkpoint 1141 expired before completing.
>>>>>>>> 2017-09-26 01:40:38,044 WARN 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Received late message for now expired checkpoint attempt 1140 from
>>>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>>>> 2017-09-26 01:40:53,743 WARN 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Received late message for now expired checkpoint attempt 1141 from
>>>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>>>> 2017-09-26 01:41:19,332 INFO 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>>>>>
>>>>>>>> For chk #1245 and #1246, there was no late message from TM. You can
>>>>>>>> refer to the TM log. The full completed checkpoint attempt will have 12
>>>>>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got 
>>>>>>>> 10
>>>>>>>> logs.
>>>>>>>>
>>>>>>>> 2017-09-26 10:08:28,690 INFO 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Triggering checkpoint 1245 @ 1506420508690
>>>>>>>> 2017-09-26 10:13:28,690 INFO 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Triggering checkpoint 1246 @ 1506420808690
>>>>>>>> 2017-09-26 10:18:28,691 INFO 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Checkpoint 1245 expired before completing.
>>>>>>>> 2017-09-26 10:23:28,691 INFO 
>>>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Checkpoint 1246 expired before completing.
>>>>>>>>
>>>>>>>> Moreover, I listed the directory for checkpoints on S3 and saw
>>>>>>>> there were two states not discarded successfully. In general, there 
>>>>>>>> will be
>>>>>>>> 16 parts for a completed checkpoint state.
>>>>>>>>
>>>>>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c0
>>>>>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-
>>>>>>>> bf0b-11cc1fc67ab8
>>>>>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c0
>>>>>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-
>>>>>>>> 8509-5fea4ed25af6
>>>>>>>>
>>>>>>>> Hope these informations are helpful. Thank you.
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Tony Wei
>>>>>>>>
>>>>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <
>>>>>>>> s.rich...@data-artisans.com>:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> thanks for the information. Unfortunately, I have no immediate
>>>>>>>>> idea what the reason is from the given information. I think most 
>>>>>>>>> helpful
>>>>>>>>> could be a thread dump, but also metrics on the operator operator 
>>>>>>>>> level to
>>>>>>>>> figure out which part of the pipeline is the culprit.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Stefan
>>>>>>>>>
>>>>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920...@gmail.com>:
>>>>>>>>>
>>>>>>>>> Hi Stefan,
>>>>>>>>>
>>>>>>>>> There is no unknown exception in my full log. The Flink version is
>>>>>>>>> 1.3.2.
>>>>>>>>> My job is roughly like this.
>>>>>>>>>
>>>>>>>>> env.addSource(Kafka)
>>>>>>>>>   .map(ParseKeyFromRecord)
>>>>>>>>>   .keyBy()
>>>>>>>>>   .process(CountAndTimeoutWindow)
>>>>>>>>>   .asyncIO(UploadToS3)
>>>>>>>>>   .addSink(UpdateDatabase)
>>>>>>>>>
>>>>>>>>> It seemed all tasks stopped like the picture I sent in the last
>>>>>>>>> email.
>>>>>>>>>
>>>>>>>>> I will keep my eye on taking a thread dump from that JVM if this
>>>>>>>>> happens again.
>>>>>>>>>
>>>>>>>>> Best Regards,
>>>>>>>>> Tony Wei
>>>>>>>>>
>>>>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <
>>>>>>>>> s.rich...@data-artisans.com>:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> that is very strange indeed. I had a look at the logs and there
>>>>>>>>>> is no error or exception reported. I assume there is also no 
>>>>>>>>>> exception in
>>>>>>>>>> your full logs? Which version of flink are you using and what 
>>>>>>>>>> operators
>>>>>>>>>> were running in the task that stopped? If this happens again, would 
>>>>>>>>>> it be
>>>>>>>>>> possible to take a thread dump from that JVM?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Stefan
>>>>>>>>>>
>>>>>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920...@gmail.com
>>>>>>>>>> >:
>>>>>>>>>> >
>>>>>>>>>> > Hi,
>>>>>>>>>> >
>>>>>>>>>> > Something weird happened on my streaming job.
>>>>>>>>>> >
>>>>>>>>>> > I found my streaming job seems to be blocked for a long time
>>>>>>>>>> and I saw the situation like the picture below. (chk #1245 and #1246 
>>>>>>>>>> were
>>>>>>>>>> all finishing 7/8 tasks then marked timeout by JM. Other checkpoints 
>>>>>>>>>> failed
>>>>>>>>>> with the same state like #1247 util I restarted TM.)
>>>>>>>>>> >
>>>>>>>>>> > <snapshot.png>
>>>>>>>>>> >
>>>>>>>>>> > I'm not sure what happened, but the consumer stopped fetching
>>>>>>>>>> records, buffer usage is 100% and the following task did not seem to 
>>>>>>>>>> fetch
>>>>>>>>>> data anymore. Just like the whole TM was stopped.
>>>>>>>>>> >
>>>>>>>>>> > However, after I restarted TM and force the job restarting from
>>>>>>>>>> the latest completed checkpoint, everything worked again. And I 
>>>>>>>>>> don't know
>>>>>>>>>> how to reproduce it.
>>>>>>>>>> >
>>>>>>>>>> > The attachment is my TM log. Because there are many user logs
>>>>>>>>>> and sensitive information, I only remain the log from 
>>>>>>>>>> `org.apache.flink...`.
>>>>>>>>>> >
>>>>>>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>>>>>>> >
>>>>>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and
>>>>>>>>>> max concurrent number is 3.
>>>>>>>>>> >
>>>>>>>>>> > Please let me know if it needs more information to find out
>>>>>>>>>> what happened on my streaming job. Thanks for your help.
>>>>>>>>>> >
>>>>>>>>>> > Best Regards,
>>>>>>>>>> > Tony Wei
>>>>>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>> <chk_ 1577.log>
>>>
>>>
>>>
> <threaddumps.log>
>
>
>

Reply via email to