Hi, if processing and checkpointing are stuck in RocksDB, this could indeed hint to a problem with your IO system. The comment about using EBS could be important, as it might be a bad idea from a performance point of view to run RocksDB on EBS; did you ever compare against running it on local SSDs?
Best, Stefan > Am 09.03.2018 um 05:08 schrieb Tony Wei <tony19920...@gmail.com>: > > Hi Stefan, Sihua, > > TLDR; after the experiment, I found that this problem might not about s3 > filesystem or network io with s3. It might caused by rocksdb and io > performance, but I still can't recognize who caused this problem. > > For more specific details, I have to introduce my flink application's detail > and what I found in the experiment. The disks I used for EC2 are SSD, but > they are EBS. > > For the application detail, there is only one keyed ProcessFunction with > ValueState with scala collection data type, which represents the counting by > event and date > This operator with receive two type of message: one is event message and the > other is overwrite state message > When operator received an event message, it would update the corresponding > value by event and client time and emit the event to the next operator with > the "whole" collection, that's why I used ValueState not MapState or > ListState. > When operator received a overwrite state message, it would overwrite the > corresponding value in the state. This is the design that we want to replay > the state constructed by the new rules. > Moreover, my key is something like user cookie, and I have a timer callback > to remove those out-dated state, for example: I'm only care about the state > in 6 months. > > For the experiment, I tried to catch the first failure to find out some cues, > so I extended the checkpoint interval to a long time and use savepoint. I > know savepoint is not actually same as checkpoint, but I guess the parts of > store state and upload to remote filesystem are similar. > After some savepoints triggered, I found that asynchronous part was stuck in > full snapshot operation and time triggers in that machine were also stock and > blocked the operator to process element. > I recalled that I had replayed lots of states for 60 days in 4 ~ 5 hours, and > the first problem happened during the replay procedure. It is just a > coincidence that the callback from those keys that I replayed happened when I > run the experiment. > However, when I tried to disable all checkpoints and savepoints to observed > if the massive keys to access rocksdb get stuck, I found the problem was > disappeared. Moreover, I roll back to the original setting that checkpoint > got stuck. The problem didn't happened again yet. > > In summary, I sill can't tell where the problem happened, since the io > performance didn't touch the limitation and the problem couldn't reproduce > based on the same initial states. > I decide to try out incremental checkpoint to reduce this risk. I will reopen > this thread with more information I can provide when the problem happen > again. If you have any suggestion about my implementation that might leads to > some problems or about this issue, please advice me. > Thank you ver much for taking your time to pay attention on this issue!! = ) > > p.s. the attachment is about the experiment I mentioned above. I didn't > record the stack trace because the only difference is only Time Trigger's > state were runnable and the operator were blocked. > > Best Regards, > Tony Wei > > > 2018-03-06 17:00 GMT+08:00 周思华 <summerle...@163.com > <mailto:summerle...@163.com>>: > Hi Tony, > > I agree with you. > > Best Regards, > > Sihua Zhou > > > 发自网易邮箱大师 > > On 03/6/2018 15:34,Tony Wei<tony19920...@gmail.com> > <mailto:tony19920...@gmail.com> wrote: > Hi Sihua, > > You are right. The incremental checkpoint might release machine from high cpu > loading and make the bad machines recover quickly, but I was wondering why > the first checkpoint failed by timeout. You can see when the bad machine > recovered, the cpu loading for each checkpoint is not so high, although there > were still peeks in each checkpoint happened. I think the high cpu loading > that might be caused by those timeout checkpointing threads is not the root > cause. I will use the incremental checkpoint eventually but I will decide if > change my persistence filesystem after we find out the root cause or stop the > investigation and make the conclusion in this mailing thread. What do you > think? > > Best Regards, > Tony Wei > > 2018-03-06 15:13 GMT+08:00 周思华 <summerle...@163.com > <mailto:summerle...@163.com>>: > Hi Tony, > > Sorry for missing the factor of cpu, I found that the "bad tm"'s cpu load is > so much higher that the 'good tm', so I think maybe it also a reason that > could lead to timeout. Since you were using "full checkpoint", it need to > iterate all the records in the RocksDB with some `if` check, when the state > is huge this is cpu costly. Let me try to explain the full checkpoint a bit > more, it contains two parts. > > Part 1. Take snapshot of the RocksDB. (This can map to the "Checkpoint > Duration (sync) " on the checkpoint detail page) > > Part2. Loop the records of the snapshot, along with some `if` check to ensure > that the data is sent to s3 in the order of the key group. (This can map to > the "Checkpoint Duration(Async)"). > > So part2 could be cpu costly and network costly, if the CPU load is too high, > then sending data will slow down, because there are in a single loop. If cpu > is the reason, this phenomenon will disappear if you use increment > checkpoint, because it almost only send data to s3. In the all, for now > trying out the incremental checkpoint is the best thing to do I think. > > Best Regards, > Sihua Zhou > > > 发自网易邮箱大师 > > On 03/6/2018 14:45,Tony Wei<tony19920...@gmail.com> > <mailto:tony19920...@gmail.com> wrote: > Sent to the wrong mailing list. Forward it to the correct one. > > ---------- Forwarded message ---------- > From: Tony Wei <tony19920...@gmail.com <mailto:tony19920...@gmail.com>> > Date: 2018-03-06 14:43 GMT+08:00 > Subject: Re: checkpoint stuck with rocksdb statebackend and s3 filesystem > To: 周思华 <summerle...@163.com <mailto:summerle...@163.com>>, Stefan Richter > <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>> > Cc: "user-subscr...@flink.apache.org > <mailto:user-subscr...@flink.apache.org>" <user-subscr...@flink.apache.org > <mailto:user-subscr...@flink.apache.org>> > > > Hi Sihua, > > Thanks a lot. I will try to find out the problem from machines' environment. > If you and Stefan have any new suggestions or thoughts, please advise me. > Thank you ! > > Best Regards, > Tony Wei > > 2018-03-06 14:34 GMT+08:00 周思华 <summerle...@163.com > <mailto:summerle...@163.com>>: > Hi Tony, > > I think the two things you mentioned can both lead to a bad network. But from > my side, I think it more likely that it is the unstable network env that > cause the poor network performance itself, because as far as I know I can't > found out the reason that the flink would slow down the network so much (even > It does, the effect should not be that so much). > > Maybe stefan could tell more about that. ;) > > Best Regards, > Sihua Zhou > > 发自网易邮箱大师 > > On 03/6/2018 14:04,Tony Wei<tony19920...@gmail.com> > <mailto:tony19920...@gmail.com> wrote: > Hi Sihua, > > > Hi Tony, > > About to your question: average end to end latency of checkpoint is less than > 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it determined > byt the max end to end latency (the slowest one), a checkpoint truly > completed only after all task's checkpoint have completed. > > Sorry for my poor expression. What I mean is the average duration of > "completed" checkpoints, so I guess there are some problems that make some > subtasks of checkpoint take so long, even more than 10 mins. > > > About to the problem: after a second look at the info you privode, we can > found from the checkpoint detail picture that there is one task which cost > 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others tasks > didn't complete the checkpoint yet. And from the bad_tm_pic.png vs > good_tm_pic.png, we can found that on "bad tm" the network performance is far > less than the "good tm" (-15 MB vs -50MB). So I guss the network is a > problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe > you need to check whether the network env is stable) > > That is what I concerned. Because I can't determine if checkpoint is stuck > makes network performance worse or network performance got worse makes > checkpoint stuck. > Although I provided one "bad machine" and one "good machine", that doesn't > mean bad machine is always bad and good machine is always good. See the > attachments. > All of my TMs met this problem at least once from last weekend until now. > Some machines recovered by themselves and some recovered after I restarted > them. > > Best Regards, > Tony Wei > > 2018-03-06 13:41 GMT+08:00 周思华 <summerle...@163.com > <mailto:summerle...@163.com>>: > > Hi Tony, > > About to your question: average end to end latency of checkpoint is less than > 1.5 mins, doesn't means that checkpoint won't timeout. indeed, it determined > byt the max end to end latency (the slowest one), a checkpoint truly > completed only after all task's checkpoint have completed. > > About to the problem: after a second look at the info you privode, we can > found from the checkpoint detail picture that there is one task which cost > 4m20s to transfer it snapshot (about 482M) to s3 and there are 4 others tasks > didn't complete the checkpoint yet. And from the bad_tm_pic.png vs > good_tm_pic.png, we can found that on "bad tm" the network performance is far > less than the "good tm" (-15 MB vs -50MB). So I guss the network is a > problem, sometimes it failed to send 500M data to s3 in 10 minutes. (maybe > you need to check whether the network env is stable) > > About the solution: I think incremental checkpoint can help you a lot, it > will only send the new data each checkpoint, but you are right if the > increment state size is huger than 500M, it might cause the timeout problem > again (because of the bad network performance). > > Best Regards, > Sihua Zhou > > 发自网易邮箱大师 > > On 03/6/2018 13:02,Tony Wei<tony19920...@gmail.com> > <mailto:tony19920...@gmail.com> wrote: > Hi Sihua, > > Thanks for your suggestion. "incremental checkpoint" is what I will try out > next and I know it will give a better performance. However, it might not > solve this issue completely, because as I said, the average end to end > latency of checkpointing is less than 1.5 mins currently, and it is far from > my timeout configuration. I believe "incremental checkpoint" will reduce the > latency and make this issue might occur seldom, but I can't promise it won't > happen again if I have bigger states growth in the future. Am I right? > > Best Regards, > Tony Wei > > 2018-03-06 10:55 GMT+08:00 周思华 <summerle...@163.com > <mailto:summerle...@163.com>>: > Hi Tony, > > Sorry for jump into, one thing I want to remind is that from the log you > provided it looks like you are using "full checkpoint", this means that the > state data that need to be checkpointed and transvered to s3 will grow over > time, and even for the first checkpoint it performance is slower that > incremental checkpoint (because it need to iterate all the record from the > rocksdb using the RocksDBMergeIterator). Maybe you can try out "incremental > checkpoint", it could help you got a better performance. > > Best Regards, > Sihua Zhou > > 发自网易邮箱大师 > > On 03/6/2018 10:34,Tony Wei<tony19920...@gmail.com> > <mailto:tony19920...@gmail.com> wrote: > Hi Stefan, > > I see. That explains why the loading of machines grew up. However, I think it > is not the root cause that led to these consecutive checkpoint timeout. As I > said in my first mail, the checkpointing progress usually took 1.5 mins to > upload states, and this operator and kafka consumer are only two operators > that have states in my pipeline. In the best case, I should never encounter > the timeout problem that only caused by lots of pending checkpointing threads > that have already timed out. Am I right? > > Since these logging and stack trace was taken after nearly 3 hours from the > first checkpoint timeout, I'm afraid that we couldn't actually find out the > root cause for the first checkpoint timeout. Because we are preparing to make > this pipeline go on production, I was wondering if you could help me find out > where the root cause happened: bad machines or s3 or flink-s3-presto packages > or flink checkpointing thread. It will be great if we can find it out from > those informations the I provided, or a hypothesis based on your experience > is welcome as well. The most important thing is that I have to decide whether > I need to change my persistence filesystem or use another s3 filesystem > package, because it is the last thing I want to see that the checkpoint > timeout happened very often. > > Thank you very much for all your advices. > > Best Regards, > Tony Wei > > 2018-03-06 1:07 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com > <mailto:s.rich...@data-artisans.com>>: > Hi, > > thanks for all the info. I had a look into the problem and opened > https://issues.apache.org/jira/browse/FLINK-8871 > <https://issues.apache.org/jira/browse/FLINK-8871> to fix this. From your > stack trace, you can see many checkpointing threads are running on your TM > for checkpoints that have already timed out, and I think this cascades and > slows down everything. Seems like the implementation of some features like > checkpoint timeouts and not failing tasks from checkpointing problems > overlooked that we also require to properly communicate that checkpoint > cancellation to all task, which was not needed before. > > Best, > Stefan > > >> Am 05.03.2018 um 14:42 schrieb Tony Wei <tony19920...@gmail.com >> <mailto:tony19920...@gmail.com>>: >> >> Hi Stefan, >> >> Here is my checkpointing configuration. >> >> Checkpointing Mode Exactly Once >> Interval 20m 0s >> Timeout 10m 0s >> Minimum Pause Between Checkpoints 0ms >> Maximum Concurrent Checkpoints 1 >> Persist Checkpoints Externally Enabled (delete on cancellation) >> Best Regards, >> Tony Wei >> >> 2018-03-05 21:30 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com >> <mailto:s.rich...@data-artisans.com>>: >> Hi, >> >> quick question: what is your exact checkpointing configuration? In >> particular, what is your value for the maximum parallel checkpoints and the >> minimum time interval to wait between two checkpoints? >> >> Best, >> Stefan >> >> > Am 05.03.2018 um 06:34 schrieb Tony Wei <tony19920...@gmail.com >> > <mailto:tony19920...@gmail.com>>: >> > >> > Hi all, >> > >> > Last weekend, my flink job's checkpoint start failing because of timeout. >> > I have no idea what happened, but I collect some informations about my >> > cluster and job. Hope someone can give me advices or hints about the >> > problem that I encountered. >> > >> > My cluster version is flink-release-1.4.0. Cluster has 10 TMs, each has 4 >> > cores. These machines are ec2 spot instances. The job's parallelism is set >> > as 32, using rocksdb as state backend and s3 presto as checkpoint file >> > system. >> > The state's size is nearly 15gb and still grows day-by-day. Normally, It >> > takes 1.5 mins to finish the whole checkpoint process. The timeout >> > configuration is set as 10 mins. >> > >> > <chk_snapshot.png> >> > >> > As the picture shows, not each subtask of checkpoint broke caused by >> > timeout, but each machine has ever broken for all its subtasks during last >> > weekend. Some machines recovered by themselves and some machines recovered >> > after I restarted them. >> > >> > I record logs, stack trace and snapshot for machine's status (CPU, IO, >> > Network, etc.) for both good and bad machine. If there is a need for more >> > informations, please let me know. Thanks in advance. >> > >> > Best Regards, >> > Tony Wei. >> > <bad_tm_log.log><bad_tm_pic.pn >> > <http://bad_tm_pic.pn/>g><bad_tm_stack.log><good_tm_log.log><good_tm_pic.png><good_tm_stack.log> >> >> > > > > > > > > > <backpressure.png><good_tm.png><bad_tm.png>