Hi Stefan, We prepared to run it on local SSDs yesterday. However, as I said, the problem just disappeared. Of course we will replace it to local SSDs, but I'm afraid that I might be able to reproduce the same situation for both environments to compare the difference.
Best Regards, Tony Wei. 2018-03-09 16:59 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>: > Hi, > > if processing and checkpointing are stuck in RocksDB, this could indeed > hint to a problem with your IO system. The comment about using EBS could be > important, as it might be a bad idea from a performance point of view to > run RocksDB on EBS; did you ever compare against running it on local SSDs? > > Best, > Stefan > > Am 09.03.2018 um 05:08 schrieb Tony Wei <tony19920...@gmail.com>: > > Hi Stefan, Sihua, > > TLDR; after the experiment, I found that this problem might not about s3 > filesystem or network io with s3. It might caused by rocksdb and io > performance, but I still can't recognize who caused this problem. > > For more specific details, I have to introduce my flink application's > detail and what I found in the experiment. The disks I used for EC2 are > SSD, but they are EBS. > > For the application detail, there is only one keyed ProcessFunction with > ValueState with scala collection data type, which represents the counting > by event and date > This operator with receive two type of message: one is event message and > the other is overwrite state message > When operator received an event message, it would update the corresponding > value by event and client time and emit the event to the next operator with > the "whole" collection, that's why I used ValueState not MapState or > ListState. > When operator received a overwrite state message, it would overwrite the > corresponding value in the state. This is the design that we want to replay > the state constructed by the new rules. > Moreover, my key is something like user cookie, and I have a timer > callback to remove those out-dated state, for example: I'm only care about > the state in 6 months. > > For the experiment, I tried to catch the first failure to find out some > cues, so I extended the checkpoint interval to a long time and use > savepoint. I know savepoint is not actually same as checkpoint, but I guess > the parts of store state and upload to remote filesystem are similar. > After some savepoints triggered, I found that asynchronous part was stuck > in full snapshot operation and time triggers in that machine were also > stock and blocked the operator to process element. > I recalled that I had replayed lots of states for 60 days in 4 ~ 5 hours, > and the first problem happened during the replay procedure. It is just a > coincidence that the callback from those keys that I replayed happened when > I run the experiment. > However, when I tried to disable all checkpoints and savepoints to > observed if the massive keys to access rocksdb get stuck, I found the > problem was disappeared. Moreover, I roll back to the original setting that > checkpoint got stuck. The problem didn't happened again yet. > > In summary, I sill can't tell where the problem happened, since the io > performance didn't touch the limitation and the problem couldn't reproduce > based on the same initial states. > I decide to try out incremental checkpoint to reduce this risk. I will > reopen this thread with more information I can provide when the problem > happen again. If you have any suggestion about my implementation that might > leads to some problems or about this issue, please advice me. > Thank you ver much for taking your time to pay attention on this issue!! = > ) > > p.s. the attachment is about the experiment I mentioned above. I didn't > record the stack trace because the only difference is only Time Trigger's > state were runnable and the operator were blocked. > > Best Regards, > Tony Wei > > > 2018-03-06 17:00 GMT+08:00 周思华 <summerle...@163.com>: > >> Hi Tony, >> >> I agree with you. >> >> Best Regards, >> >> Sihua Zhou >> >> >> 发自网易邮箱大师 >> >> On 03/6/2018 15:34,Tony Wei<tony19920...@gmail.com> >> <tony19920...@gmail.com> wrote: >> >> Hi Sihua, >> >> You are right. The incremental checkpoint might release machine from high >> cpu loading and make the bad machines recover quickly, but I was wondering >> why the first checkpoint failed by timeout. You can see when the bad >> machine recovered, the cpu loading for each checkpoint is not so high, >> although there were still peeks in each checkpoint happened. I think the >> high cpu loading that might be caused by those timeout checkpointing >> threads is not the root cause. I will use the incremental checkpoint >> eventually but I will decide if change my persistence filesystem after we >> find out the root cause or stop the investigation and make the >> conclusion in this mailing thread. What do you think? >> >> Best Regards, >> Tony Wei >> >> 2018-03-06 15:13 GMT+08:00 周思华 <summerle...@163.com>: >> >>> Hi Tony, >>> >>> Sorry for missing the factor of cpu, I found that the "bad tm"'s cpu >>> load is so much higher that the 'good tm', so I think maybe it also a >>> reason that could lead to timeout. Since you were using "full checkpoint", >>> it need to iterate all the records in the RocksDB with some `if` check, >>> when the state is huge this is cpu costly. Let me try to explain the full >>> checkpoint a bit more, it contains two parts. >>> >>> Part 1. Take snapshot of the RocksDB. (This can map to the "Checkpoint >>> Duration (sync) " on the checkpoint detail page) >>> >>> Part2. Loop the records of the snapshot, along with some `if` check to >>> ensure >>> that the data is sent to s3 in the order of the key group. (This can map to >>> the "Checkpoint Duration(Async)"). >>> >>> So part2 could be cpu costly and network costly, if the CPU load is too >>> high, then sending data will slow down, because there are in a single loop. >>> If cpu is the reason, this phenomenon will disappear if you use increment >>> checkpoint, because it almost only send data to s3. In the all, for now >>> trying out the incremental checkpoint is the best thing to do I think. >>> >>> Best Regards, >>> Sihua Zhou >>> >>> >>> 发自网易邮箱大师 >>> >>> On 03/6/2018 14:45,Tony Wei<tony19920...@gmail.com> >>> <tony19920...@gmail.com> wrote: >>> >>> Sent to the wrong mailing list. Forward it to the correct one. >>> >>> ---------- Forwarded message ---------- >>> From: Tony Wei <tony19920...@gmail.com> >>> Date: 2018-03-06 14:43 GMT+08:00 >>> Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem >>> To: 周思华 <summerle...@163.com>, Stefan Richter < >>> s.rich...@data-artisans.com> >>> Cc: "user-subscr...@flink.apache.org" <user-subscr...@flink.apache.org> >>> >>> >>> Hi Sihua, >>> >>> Thanks a lot. I will try to find out the problem from machines' >>> environment. If you and Stefan have any new suggestions or thoughts, please >>> advise me. Thank you ! >>> >>> Best Regards, >>> Tony Wei >>> >>> 2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com>: >>> >>>> Hi Tony, >>>> >>>> I think the two things you mentioned can both lead to a bad network. >>>> But from my side, I think it more likely that it is the unstable network >>>> env that cause the poor network performance itself, because as far as >>>> I know I can't found out the reason that the flink would slow down the >>>> network so much (even It does, the effect should not be that so much). >>>> >>>> Maybe stefan could tell more about that. ;) >>>> >>>> Best Regards, >>>> Sihua Zhou >>>> >>>> 发自网易邮箱大师 >>>> >>>> On 03/6/2018 14:04,Tony Wei<tony19920...@gmail.com> >>>> <tony19920...@gmail.com> wrote: >>>> >>>> Hi Sihua, >>>> >>>> >>>>> Hi Tony, >>>>> >>>>> About to your question: average end to end latency of checkpoint is >>>>> less than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, >>>>> it >>>>> determined byt the max end to end latency (the slowest one), a checkpoint >>>>> truly completed only after all task's checkpoint have completed. >>>>> >>>> >>>> Sorry for my poor expression. What I mean is the average duration of >>>> "completed" checkpoints, so I guess there are some problems that make some >>>> subtasks of checkpoint take so long, even more than 10 mins. >>>> >>>> >>>>> >>>>> About to the problem: after a second look at the info you privode, we >>>>> can found from the checkpoint detail picture that there is one task which >>>>> cost 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 >>>>> others tasks didn't complete the checkpoint yet. And from the >>>>> bad_tm_pic.png vs good_tm_pic.png, we can found that on "bad tm" the >>>>> network performance is far less than the "good tm" (-15 MB vs -50MB). So I >>>>> guss the network is a problem, sometimes it failed to send 500M data to s3 >>>>> in 10 minutes. (maybe you need to check whether the network env is stable) >>>>> >>>> >>>> That is what I concerned. Because I can't determine if checkpoint is >>>> stuck makes network performance worse or network performance got worse >>>> makes checkpoint stuck. >>>> Although I provided one "bad machine" and one "good machine", that >>>> doesn't mean bad machine is always bad and good machine is always good. See >>>> the attachments. >>>> All of my TMs met this problem at least once from last weekend until >>>> now. Some machines recovered by themselves and some recovered after I >>>> restarted them. >>>> >>>> Best Regards, >>>> Tony Wei >>>> >>>> 2018-03-06 13:41 GMT+08:00 周思华 <summerle...@163.com>: >>>> >>>>> >>>>> Hi Tony, >>>>> >>>>> About to your question: average end to end latency of checkpoint is >>>>> less than 1.5 mins, doesn't means that checkpoint won't timeout. indeed, >>>>> it >>>>> determined byt the max end to end latency (the slowest one), a checkpoint >>>>> truly completed only after all task's checkpoint have completed. >>>>> >>>>> About to the problem: after a second look at the info you privode, we >>>>> can found from the checkpoint detail picture that there is one task which >>>>> cost 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 >>>>> others tasks didn't complete the checkpoint yet. And from the >>>>> bad_tm_pic.png vs good_tm_pic.png, we can found that on "bad tm" the >>>>> network performance is far less than the "good tm" (-15 MB vs -50MB). So I >>>>> guss the network is a problem, sometimes it failed to send 500M data to s3 >>>>> in 10 minutes. (maybe you need to check whether the network env is stable) >>>>> >>>>> About the solution: I think incremental checkpoint can help you a lot, >>>>> it will only send the new data each checkpoint, but you are right if the >>>>> increment state size is huger than 500M, it might cause the timeout >>>>> problem >>>>> again (because of the bad network performance). >>>>> >>>>> Best Regards, >>>>> Sihua Zhou >>>>> >>>>> 发自网易邮箱大师 >>>>> >>>>> On 03/6/2018 13:02,Tony Wei<tony19920...@gmail.com> >>>>> <tony19920...@gmail.com> wrote: >>>>> >>>>> Hi Sihua, >>>>> >>>>> Thanks for your suggestion. "incremental checkpoint" is what I will >>>>> try out next and I know it will give a better performance. However, it >>>>> might not solve this issue completely, because as I said, the average end >>>>> to end latency of checkpointing is less than 1.5 mins currently, and it is >>>>> far from my timeout configuration. I believe "incremental checkpoint" will >>>>> reduce the latency and make this issue might occur seldom, but I can't >>>>> promise it won't happen again if I have bigger states growth in the >>>>> future. >>>>> Am I right? >>>>> >>>>> Best Regards, >>>>> Tony Wei >>>>> >>>>> 2018-03-06 10:55 GMT+08:00 周思华 <summerle...@163.com>: >>>>> >>>>>> Hi Tony, >>>>>> >>>>>> Sorry for jump into, one thing I want to remind is that from the log >>>>>> you provided it looks like you are using "full checkpoint", this means >>>>>> that >>>>>> the state data that need to be checkpointed and transvered to s3 will >>>>>> grow >>>>>> over time, and even for the first checkpoint it performance is slower >>>>>> that >>>>>> incremental checkpoint (because it need to iterate all the record from >>>>>> the >>>>>> rocksdb using the RocksDBMergeIterator). Maybe you can try out >>>>>> "incremental >>>>>> checkpoint", it could help you got a better performance. >>>>>> >>>>>> Best Regards, >>>>>> Sihua Zhou >>>>>> >>>>>> 发自网易邮箱大师 >>>>>> >>>>>> On 03/6/2018 10:34,Tony Wei<tony19920...@gmail.com> >>>>>> <tony19920...@gmail.com> wrote: >>>>>> >>>>>> Hi Stefan, >>>>>> >>>>>> I see. That explains why the loading of machines grew up. However, I >>>>>> think it is not the root cause that led to these consecutive checkpoint >>>>>> timeout. As I said in my first mail, the checkpointing progress usually >>>>>> took 1.5 mins to upload states, and this operator and kafka consumer are >>>>>> only two operators that have states in my pipeline. In the best case, I >>>>>> should never encounter the timeout problem that only caused by lots of >>>>>> pending checkpointing threads that have already timed out. Am I right? >>>>>> >>>>>> Since these logging and stack trace was taken after nearly 3 hours >>>>>> from the first checkpoint timeout, I'm afraid that we couldn't actually >>>>>> find out the root cause for the first checkpoint timeout. Because we >>>>>> are preparing to make this pipeline go on production, I was wondering if >>>>>> you could help me find out where the root cause happened: bad machines or >>>>>> s3 or flink-s3-presto packages or flink checkpointing thread. It will be >>>>>> great if we can find it out from those informations the I provided, or a >>>>>> hypothesis based on your experience is welcome as well. The most >>>>>> important >>>>>> thing is that I have to decide whether I need to change my persistence >>>>>> filesystem or use another s3 filesystem package, because it is the last >>>>>> thing I want to see that the checkpoint timeout happened very often. >>>>>> >>>>>> Thank you very much for all your advices. >>>>>> >>>>>> Best Regards, >>>>>> Tony Wei >>>>>> >>>>>> 2018-03-06 1:07 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com >>>>>> >: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> thanks for all the info. I had a look into the problem and opened >>>>>>> https://issues.apache.org/jira/browse/FLINK-8871 to fix this. From >>>>>>> your stack trace, you can see many checkpointing threads are running on >>>>>>> your TM for checkpoints that have already timed out, and I think this >>>>>>> cascades and slows down everything. Seems like the implementation of >>>>>>> some >>>>>>> features like checkpoint timeouts and not failing tasks from >>>>>>> checkpointing >>>>>>> problems overlooked that we also require to properly communicate that >>>>>>> checkpoint cancellation to all task, which was not needed before. >>>>>>> >>>>>>> Best, >>>>>>> Stefan >>>>>>> >>>>>>> >>>>>>> Am 05.03.2018 um 14:42 schrieb Tony Wei <tony19920...@gmail.com>: >>>>>>> >>>>>>> Hi Stefan, >>>>>>> >>>>>>> Here is my checkpointing configuration. >>>>>>> >>>>>>> Checkpointing Mode Exactly Once >>>>>>> Interval 20m 0s >>>>>>> Timeout 10m 0s >>>>>>> Minimum Pause Between Checkpoints 0ms >>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>> Persist Checkpoints Externally Enabled (delete on cancellation) >>>>>>> Best Regards, >>>>>>> Tony Wei >>>>>>> >>>>>>> 2018-03-05 21:30 GMT+08:00 Stefan Richter < >>>>>>> s.rich...@data-artisans.com>: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> quick question: what is your exact checkpointing configuration? In >>>>>>>> particular, what is your value for the maximum parallel checkpoints >>>>>>>> and the >>>>>>>> minimum time interval to wait between two checkpoints? >>>>>>>> >>>>>>>> Best, >>>>>>>> Stefan >>>>>>>> >>>>>>>> > Am 05.03.2018 um 06:34 schrieb Tony Wei <tony19920...@gmail.com>: >>>>>>>> > >>>>>>>> > Hi all, >>>>>>>> > >>>>>>>> > Last weekend, my flink job's checkpoint start failing because of >>>>>>>> timeout. I have no idea what happened, but I collect some informations >>>>>>>> about my cluster and job. Hope someone can give me advices or hints >>>>>>>> about >>>>>>>> the problem that I encountered. >>>>>>>> > >>>>>>>> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, >>>>>>>> each has 4 cores. These machines are ec2 spot instances. The job's >>>>>>>> parallelism is set as 32, using rocksdb as state backend and s3 presto >>>>>>>> as >>>>>>>> checkpoint file system. >>>>>>>> > The state's size is nearly 15gb and still grows day-by-day. >>>>>>>> Normally, It takes 1.5 mins to finish the whole checkpoint process. The >>>>>>>> timeout configuration is set as 10 mins. >>>>>>>> > >>>>>>>> > <chk_snapshot.png> >>>>>>>> > >>>>>>>> > As the picture shows, not each subtask of checkpoint broke caused >>>>>>>> by timeout, but each machine has ever broken for all its subtasks >>>>>>>> during >>>>>>>> last weekend. Some machines recovered by themselves and some machines >>>>>>>> recovered after I restarted them. >>>>>>>> > >>>>>>>> > I record logs, stack trace and snapshot for machine's status >>>>>>>> (CPU, IO, Network, etc.) for both good and bad machine. If there is a >>>>>>>> need >>>>>>>> for more informations, please let me know. Thanks in advance. >>>>>>>> > >>>>>>>> > Best Regards, >>>>>>>> > Tony Wei. >>>>>>>> > <bad_tm_log.log><bad_tm_pic.png><bad_tm_stack.log><good_tm_l >>>>>>>> og.log><good_tm_pic.png><good_tm_stack.log> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> > <backpressure.png><good_tm.png><bad_tm.png> > > >