I have some ideas about detecting the backpressure (the blocking operators)  by checkpoint barrier .

I have some flink-jobs with checkpoint , but their checkpoints will take a long time to be completed .

I need to find out the blocking operators  , the same as the backpressure detection .

In a checkpoint object , I can get a timestamp which means the start-time , then I compute a metric in

org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .

The metric  is  a delta time between checkpoint.timestamp to the time when StreamTask.executeCheckpointing invoke

and I named it as checkpoint-delay-time .

It looks like the end-to-end-time metric in checkpoint  but not include async-handles  ,

For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 ) ---> C (parallelism : 1)

Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from A(there are 2 instances )

Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from B(there are 3 instances )

Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from C(there is 1 instance)

Then I can get the other 3 delta time from checkpoint-delay-values

result-0-->A  = Checkpoint-delay-value-A  -  0

result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A

result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B

someone ( result-X-->Y)  which is longer than 5s (maybe other threshold)  should be the black sheep .





在 2019/1/3 下午2:43, Yun Gao :
Hello liping,

        Thank you for proposing to optimize the backpressure detection! From our 
previous experience, we think the InputBufferPoolUsageGauge and 
OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a list of 
tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage of task A 
and InputBufferPoolUsage of task B is 100%, but the OutputBufferPoolUsage of task B 
is less than 100%, then it should be the task B that causes the backpressure.

       However, currently we think that the InputBufferPoolUsage and 
OutputBufferPoolUsage requires some modification to be more accurate:
1. When there are multiple inputs or outputs, the InputBufferPoolUsage and OutputBufferPoolUsage should show the maximum usage instead of the average usage [1].
          2. Currently the sender side will report backlog right before 
fulfilling the output Buffer. Together with the pre-allocate logic in the 
receiver side, the InputBufferPoolUsage may be 100% even if the data have not 
been received yet [2].

      We may need to address these problems before adopting the 
InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure indicator.

      Besides, another similar thought is that we may also add new 
InputBufferUsage and OutputBufferUsage metrics to show (number of queued 
buffers / number of all buffers) instead.


     Best,
    Yun Gao


     [1] https://issues.apache.org/jira/browse/FLINK-10981
     [2] https://issues.apache.org/jira/browse/FLINK-11082


------------------------------------------------------------------
From:裴立平 <peiliping...@gmail.com>
Send Time:2019 Jan. 3 (Thu.) 13:39
To:dev <dev@flink.apache.org>
Subject:[DISCUSS] Detection Flink Backpressure

Recently I want to optimize the way to find the positions where the
backpressures occured .

I read some blogs about flink-backpressure and have a rough idea of it .

The method which Flink adopted is thread-stack-sample ,  it's heavy and
no-lasting .

The positions where backpressures occured are very important to the
developers .

They should be treated as monitor-metrics .

Any other choice that we can take to detection the flink backpressures ?


Reply via email to