Hi Zhijiang, Thanks a lot for your reasoning!
I tried to set the checkpoint to at-leaset-once as you suggested, but unluckily the problem remains the same :( IMHO, if it’s caused by barrier alignment, the state size (mainly buffers during alignment) would be big, right? But actually it’s not, so we didn’t think that way before. Best, Paul Lam > 在 2019年2月28日,16:12,zhijiang <wangzhijiang...@aliyun.com> 写道: > > Hi Paul, > > I am not sure whether task thread is involverd in some works during > snapshoting states for FsStateBackend. But I have another experience which > might also cause your problem. > From your descriptions below, the last task is blocked by > `SingleInputGate.getNextBufferOrEvent` that means the middle task does not > have any outpus or the middle operator does not process records. > The backpressure is high between source and middle task which results in > blocking the source task in `requestBufferBuilder`. > > Based on above two points, I guess the middle task is waiting for barrier > from some source tasks. For the input channels which already receives the > barriers, the middle task would not process the following data buffers and > just cache them, so it would result in backpressure the corresponding source > based on credit-based flow control. For the input channels without barriers, > if there are also no data buffers, then the middle task would not have any > outputs. So I think one hint is to trace why some source task emits barrier > delay. > > In order to double check the above analysis, you can change the checkpoint > mode from `exactly-once` to `at-least once`, if the cpu usages and task TPS > are not decreased for a period as before, I think we could confirm the above > analysis. :) > > Best, > Zhijiang > ------------------------------------------------------------------ > From:Paul Lam <paullin3...@gmail.com> > Send Time:2019年2月28日(星期四) 15:17 > To:user <user@flink.apache.org> > Subject:Flink performance drops when async checkpoint is slow > > Hi, > > I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some > transformations and aggregates, and write to two Kafka topics respectively. > Meanwhile, there’s a custom source that pulls configurations for the > transformations periodically. The generic job graph is as below. > > <屏幕快照 2019-02-25 11.24.54.png> > > The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is > unstable, and sometimes HDFS client reports slow read and slow > waitForAckedSeqno during checkpoints. When that happens, the Flink job > consume rate drops significantly, and some taskmanager’ cpu usage drops from > about 140% to 1%, all the task threads on that taskmanager are blocked. This > situation lasts from seconds to a minute. We started a parallel job with > everything the same except checkpointing disabled, and it runs very steady. > But I think as the checkpointing is async, it should not affect the task > threads. > > There are some additional information that we observed: > > - When the performance drops, jstack shows that Kafka source and the task > right after it is blocked at requesting memory buffer (with back pressure > close to 1), and the last task is blocked at > `SingleInputGate.getNextBufferOrEvent`. > - The dashboard shows that the buffer during alignment is less than 10 MB, > even when back pressure is high. > > We’ve been struggling with this problem for weeks, and any help is > appreciated. Thanks a lot! > > Best, > Paul Lam > >