Hi,

In that case I think instead of fixing the current back pressure monitoring 
mechanism, it would be better to replace it with a new one based on output 
queues length. But I haven’t thought it through, especially with respect to 
performance implications, however my gut feeling is that it should be solvable 
in one way or another.

Piotrek

> On 3 Jan 2019, at 20:05, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> There’s the related issue of Async I/O not showing up in back pressure 
> reporting, also due to the same issue of threads not being sampled.
> 
> — Ken
> 
>> On Jan 3, 2019, at 10:25 AM, Jamie Grier <jgr...@lyft.com.INVALID> wrote:
>> 
>> One unfortunate problem with the current back-pressure detection mechanism
>> is that it doesn't work well with all of our sources.  The problem is that
>> some sources (Kinesis for sure) emit elements from threads Flink knows
>> nothing about and therefore those stack traces aren't sampled.  The result
>> is that you never see back-pressure detected in the first chain of a Flink
>> job containing that source.
>> 
>> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <pi...@da-platform.com> wrote:
>> 
>>> Hi all,
>>> 
>>> peiliping: I think your idea could be problematic for couple of reasons.
>>> Probably minor concern is that checkpoint time could be affected not only
>>> because of the back pressure, but also because how long does it take to
>>> actually perform the checkpoint. Bigger issues are that this bottleneck
>>> detection would be limited to only during checkpointing (what if one has
>>> checkpoints only once every 1 hour? Or none at all?) AND
>>> performance/bottlenecks may change significantly during checkpointing (for
>>> example writing state for the first operator to DFS can affect indirectly
>>> down stream operators).
>>> 
>>> The idea of detecting back pressure/bottlenecks using output/input buffers
>>> is much more natural. Because in the end, almost by definition, if the
>>> output buffers are full, that means that the given task is back pressured.
>>> 
>>> Both input and output queues length are already exposed via metrics, so
>>> developers have an access to raw data to manually calculate/detect
>>> bottlenecks. It would be actually nice to automatically aggregate those
>>> metrics and provide ready to use metrics: boolean flags whether
>>> task/stage/job are back pressured or not.
>>> 
>>> Replacing current back pressure detection mechanism that probes the
>>> threads and checks which of them are waiting for buffers is another issues.
>>> Functionally it is equivalent to monitoring whether the output queues are
>>> full. This might be more hacky, but will give the same results, thus it
>>> wasn’t high on my priority list to change/refactor. It would be nice to
>>> clean this up a little bit and unify, but using metrics can also mean some
>>> additional work, since there are some known metrics related performance
>>> issues.
>>> 
>>> Piotrek
>>> 
>>>> On 3 Jan 2019, at 10:35, peiliping <peiliping...@gmail.com> wrote:
>>>> 
>>>> I have some ideas about detecting the backpressure (the blocking
>>> operators)  by checkpoint barrier .
>>>> 
>>>> I have some flink-jobs with checkpoint , but their checkpoints will take
>>> a long time to be completed .
>>>> 
>>>> I need to find out the blocking operators  , the same as the
>>> backpressure detection .
>>>> 
>>>> In a checkpoint object , I can get a timestamp which means the
>>> start-time , then I compute a metric in
>>>> 
>>>> 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
>>>> 
>>>> The metric  is  a delta time between checkpoint.timestamp to the time
>>> when StreamTask.executeCheckpointing invoke
>>>> 
>>>> and I named it as checkpoint-delay-time .
>>>> 
>>>> It looks like the end-to-end-time metric in checkpoint  but not include
>>> async-handles  ,
>>>> 
>>>> For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
>>> ---> C (parallelism : 1)
>>>> 
>>>> Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
>>> A(there are 2 instances )
>>>> 
>>>> Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
>>> B(there are 3 instances )
>>>> 
>>>> Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
>>> C(there is 1 instance)
>>>> 
>>>> Then I can get the other 3 delta time from checkpoint-delay-values
>>>> 
>>>> result-0-->A  = Checkpoint-delay-value-A  -  0
>>>> 
>>>> result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
>>>> 
>>>> result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
>>>> 
>>>> someone ( result-X-->Y)  which is longer than 5s (maybe other
>>> threshold)  should be the black sheep .
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 在 2019/1/3 下午2:43, Yun Gao :
>>>>> Hello liping,
>>>>> 
>>>>>      Thank you for proposing to optimize the backpressure detection!
>>> From our previous experience, we think the InputBufferPoolUsageGauge and
>>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
>>> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage
>>> of task A and InputBufferPoolUsage of task B is 100%, but the
>>> OutputBufferPoolUsage of task B is less than 100%, then it should be the
>>> task B that causes the backpressure.
>>>>> 
>>>>>     However, currently we think that the InputBufferPoolUsage and
>>> OutputBufferPoolUsage requires some modification to be more accurate:
>>>>>          1. When there are multiple inputs or outputs, the
>>> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum
>>> usage instead of the average usage [1].
>>>>>        2. Currently the sender side will report backlog right before
>>> fulfilling the output Buffer. Together with the pre-allocate logic in the
>>> receiver side, the InputBufferPoolUsage may be 100% even if the data have
>>> not been received yet [2].
>>>>> 
>>>>>    We may need to address these problems before adopting the
>>> InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure
>>> indicator.
>>>>> 
>>>>>    Besides, another similar thought is that we may also add new
>>> InputBufferUsage and OutputBufferUsage metrics to show (number of queued
>>> buffers / number of all buffers) instead.
>>>>> 
>>>>> 
>>>>>   Best,
>>>>>  Yun Gao
>>>>> 
>>>>> 
>>>>>   [1] https://issues.apache.org/jira/browse/FLINK-10981
>>>>>   [2] https://issues.apache.org/jira/browse/FLINK-11082
>>>>> 
>>>>> 
>>>>> ------------------------------------------------------------------
>>>>> From:裴立平 <peiliping...@gmail.com>
>>>>> Send Time:2019 Jan. 3 (Thu.) 13:39
>>>>> To:dev <dev@flink.apache.org>
>>>>> Subject:[DISCUSS] Detection Flink Backpressure
>>>>> 
>>>>> Recently I want to optimize the way to find the positions where the
>>>>> backpressures occured .
>>>>> 
>>>>> I read some blogs about flink-backpressure and have a rough idea of it .
>>>>> 
>>>>> The method which Flink adopted is thread-stack-sample ,  it's heavy and
>>>>> no-lasting .
>>>>> 
>>>>> The positions where backpressures occured are very important to the
>>>>> developers .
>>>>> 
>>>>> They should be treated as monitor-metrics .
>>>>> 
>>>>> Any other choice that we can take to detection the flink backpressures ?
>>>>> 
>>>> 
>>> 
>>> 
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

Reply via email to