Hi all,

peiliping: I think your idea could be problematic for couple of reasons. 
Probably minor concern is that checkpoint time could be affected not only 
because of the back pressure, but also because how long does it take to 
actually perform the checkpoint. Bigger issues are that this bottleneck 
detection would be limited to only during checkpointing (what if one has 
checkpoints only once every 1 hour? Or none at all?) AND 
performance/bottlenecks may change significantly during checkpointing (for 
example writing state for the first operator to DFS can affect indirectly down 
stream operators).

The idea of detecting back pressure/bottlenecks using output/input buffers is 
much more natural. Because in the end, almost by definition, if the output 
buffers are full, that means that the given task is back pressured. 

Both input and output queues length are already exposed via metrics, so 
developers have an access to raw data to manually calculate/detect bottlenecks. 
It would be actually nice to automatically aggregate those metrics and provide 
ready to use metrics: boolean flags whether task/stage/job are back pressured 
or not.

Replacing current back pressure detection mechanism that probes the threads and 
checks which of them are waiting for buffers is another issues. Functionally it 
is equivalent to monitoring whether the output queues are full. This might be 
more hacky, but will give the same results, thus it wasn’t high on my priority 
list to change/refactor. It would be nice to clean this up a little bit and 
unify, but using metrics can also mean some additional work, since there are 
some known metrics related performance issues.

Piotrek

> On 3 Jan 2019, at 10:35, peiliping <peiliping...@gmail.com> wrote:
> 
> I have some ideas about detecting the backpressure (the blocking operators)  
> by checkpoint barrier .
> 
> I have some flink-jobs with checkpoint , but their checkpoints will take a 
> long time to be completed .
> 
> I need to find out the blocking operators  , the same as the backpressure 
> detection .
> 
> In a checkpoint object , I can get a timestamp which means the start-time , 
> then I compute a metric in
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
> 
> The metric  is  a delta time between checkpoint.timestamp to the time when 
> StreamTask.executeCheckpointing invoke
> 
> and I named it as checkpoint-delay-time .
> 
> It looks like the end-to-end-time metric in checkpoint  but not include 
> async-handles  ,
> 
> For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 ) ---> C 
> (parallelism : 1)
> 
> Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from A(there 
> are 2 instances )
> 
> Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from B(there 
> are 3 instances )
> 
> Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from C(there 
> is 1 instance)
> 
> Then I can get the other 3 delta time from checkpoint-delay-values
> 
> result-0-->A  = Checkpoint-delay-value-A  -  0
> 
> result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
> 
> result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
> 
> someone ( result-X-->Y)  which is longer than 5s (maybe other threshold)  
> should be the black sheep .
> 
> 
> 
> 
> 
> 在 2019/1/3 下午2:43, Yun Gao :
>> Hello liping,
>> 
>>        Thank you for proposing to optimize the backpressure detection! From 
>> our previous experience, we think the InputBufferPoolUsageGauge and 
>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a 
>> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage 
>> of task A and InputBufferPoolUsage of task B is 100%, but the 
>> OutputBufferPoolUsage of task B is less than 100%, then it should be the 
>> task B that causes the backpressure.
>> 
>>       However, currently we think that the InputBufferPoolUsage and 
>> OutputBufferPoolUsage requires some modification to be more accurate:
>>            1. When there are multiple inputs or outputs, the 
>> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum 
>> usage instead of the average usage [1].
>>          2. Currently the sender side will report backlog right before 
>> fulfilling the output Buffer. Together with the pre-allocate logic in the 
>> receiver side, the InputBufferPoolUsage may be 100% even if the data have 
>> not been received yet [2].
>> 
>>      We may need to address these problems before adopting the 
>> InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure 
>> indicator.
>> 
>>      Besides, another similar thought is that we may also add new 
>> InputBufferUsage and OutputBufferUsage metrics to show (number of queued 
>> buffers / number of all buffers) instead.
>> 
>> 
>>     Best,
>>    Yun Gao
>> 
>> 
>>     [1] https://issues.apache.org/jira/browse/FLINK-10981
>>     [2] https://issues.apache.org/jira/browse/FLINK-11082
>> 
>> 
>> ------------------------------------------------------------------
>> From:裴立平 <peiliping...@gmail.com>
>> Send Time:2019 Jan. 3 (Thu.) 13:39
>> To:dev <dev@flink.apache.org>
>> Subject:[DISCUSS] Detection Flink Backpressure
>> 
>> Recently I want to optimize the way to find the positions where the
>> backpressures occured .
>> 
>> I read some blogs about flink-backpressure and have a rough idea of it .
>> 
>> The method which Flink adopted is thread-stack-sample ,  it's heavy and
>> no-lasting .
>> 
>> The positions where backpressures occured are very important to the
>> developers .
>> 
>> They should be treated as monitor-metrics .
>> 
>> Any other choice that we can take to detection the flink backpressures ?
>> 
> 

Reply via email to