Hi Zhijiang,

Thanks a lot for your reasoning! 

I tried to set the checkpoint to at-leaset-once as you suggested, but unluckily 
the problem remains the same :(

IMHO, if it’s caused by barrier alignment, the state size (mainly buffers 
during alignment) would be big, right? But actually it’s not, so we didn’t 
think that way before.

Best,
Paul Lam

> 在 2019年2月28日,16:12,zhijiang <wangzhijiang...@aliyun.com> 写道:
> 
> Hi Paul,
> 
> I am not sure whether task thread is involverd in some works during 
> snapshoting states for FsStateBackend. But I have another experience which 
> might also cause your problem.
> From your descriptions below, the last task is blocked by 
> `SingleInputGate.getNextBufferOrEvent` that means the middle task does not 
> have any outpus or the middle operator does not process records.
> The backpressure is high between source and middle task which results in 
> blocking the source task in `requestBufferBuilder`.
> 
> Based on above two points, I guess the middle task is waiting for barrier 
> from some source tasks. For the input channels which already receives the 
> barriers, the middle task would not process the following data buffers and 
> just cache them, so it would result in backpressure the corresponding source 
> based on credit-based flow control.  For the input channels without barriers, 
> if there are also no data buffers, then the middle task would not have any 
> outputs. So I think one hint is to trace why some source task emits barrier 
> delay.
> 
> In order to double check the above analysis, you can change the checkpoint 
> mode from `exactly-once` to `at-least once`, if the cpu usages and task TPS 
> are not decreased for a period as before, I think we could confirm the above 
> analysis. :)
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> From:Paul Lam <paullin3...@gmail.com>
> Send Time:2019年2月28日(星期四) 15:17
> To:user <user@flink.apache.org>
> Subject:Flink performance drops when async checkpoint is slow
> 
> Hi,
> 
> I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some 
> transformations and aggregates, and write to two Kafka topics respectively. 
> Meanwhile, there’s a custom source that pulls configurations for the 
> transformations periodically. The generic job graph is as below.
> 
> <屏幕快照 2019-02-25 11.24.54.png>
> 
> The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is 
> unstable, and sometimes HDFS client reports slow read and slow 
> waitForAckedSeqno during checkpoints. When that happens, the Flink job 
> consume rate drops significantly, and some taskmanager’ cpu usage drops from 
> about 140% to 1%, all the task threads on that taskmanager are blocked. This 
> situation lasts from seconds to a minute. We started a parallel job with 
> everything the same except checkpointing disabled, and it runs very steady.
> But I think as the checkpointing is async, it should not affect the task 
> threads.
> 
> There are some additional information that we observed:
> 
> -  When the performance drops, jstack shows that Kafka source and the task 
> right after it is blocked at requesting memory buffer (with back pressure 
> close to 1), and the last task is blocked at  
> `SingleInputGate.getNextBufferOrEvent`. 
> - The dashboard shows that the buffer during alignment is less than 10 MB, 
> even when back pressure is high.
> 
> We’ve been struggling with this problem for weeks, and any help is 
> appreciated. Thanks a lot!
> 
> Best,
> Paul Lam
> 
> 

Reply via email to