There’s the related issue of Async I/O not showing up in back pressure 
reporting, also due to the same issue of threads not being sampled.

— Ken

> On Jan 3, 2019, at 10:25 AM, Jamie Grier <jgr...@lyft.com.INVALID> wrote:
> 
> One unfortunate problem with the current back-pressure detection mechanism
> is that it doesn't work well with all of our sources.  The problem is that
> some sources (Kinesis for sure) emit elements from threads Flink knows
> nothing about and therefore those stack traces aren't sampled.  The result
> is that you never see back-pressure detected in the first chain of a Flink
> job containing that source.
> 
> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <pi...@da-platform.com> wrote:
> 
>> Hi all,
>> 
>> peiliping: I think your idea could be problematic for couple of reasons.
>> Probably minor concern is that checkpoint time could be affected not only
>> because of the back pressure, but also because how long does it take to
>> actually perform the checkpoint. Bigger issues are that this bottleneck
>> detection would be limited to only during checkpointing (what if one has
>> checkpoints only once every 1 hour? Or none at all?) AND
>> performance/bottlenecks may change significantly during checkpointing (for
>> example writing state for the first operator to DFS can affect indirectly
>> down stream operators).
>> 
>> The idea of detecting back pressure/bottlenecks using output/input buffers
>> is much more natural. Because in the end, almost by definition, if the
>> output buffers are full, that means that the given task is back pressured.
>> 
>> Both input and output queues length are already exposed via metrics, so
>> developers have an access to raw data to manually calculate/detect
>> bottlenecks. It would be actually nice to automatically aggregate those
>> metrics and provide ready to use metrics: boolean flags whether
>> task/stage/job are back pressured or not.
>> 
>> Replacing current back pressure detection mechanism that probes the
>> threads and checks which of them are waiting for buffers is another issues.
>> Functionally it is equivalent to monitoring whether the output queues are
>> full. This might be more hacky, but will give the same results, thus it
>> wasn’t high on my priority list to change/refactor. It would be nice to
>> clean this up a little bit and unify, but using metrics can also mean some
>> additional work, since there are some known metrics related performance
>> issues.
>> 
>> Piotrek
>> 
>>> On 3 Jan 2019, at 10:35, peiliping <peiliping...@gmail.com> wrote:
>>> 
>>> I have some ideas about detecting the backpressure (the blocking
>> operators)  by checkpoint barrier .
>>> 
>>> I have some flink-jobs with checkpoint , but their checkpoints will take
>> a long time to be completed .
>>> 
>>> I need to find out the blocking operators  , the same as the
>> backpressure detection .
>>> 
>>> In a checkpoint object , I can get a timestamp which means the
>> start-time , then I compute a metric in
>>> 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() .
>>> 
>>> The metric  is  a delta time between checkpoint.timestamp to the time
>> when StreamTask.executeCheckpointing invoke
>>> 
>>> and I named it as checkpoint-delay-time .
>>> 
>>> It looks like the end-to-end-time metric in checkpoint  but not include
>> async-handles  ,
>>> 
>>> For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 )
>> ---> C (parallelism : 1)
>>> 
>>> Checkpoint-delay-value-A : I get  the max checkpoint-delay-time from
>> A(there are 2 instances )
>>> 
>>> Checkpoint-delay-value-B : I get  the max checkpoint-delay-time from
>> B(there are 3 instances )
>>> 
>>> Checkpoint-delay-value-C : I get  the max checkpoint-delay-time from
>> C(there is 1 instance)
>>> 
>>> Then I can get the other 3 delta time from checkpoint-delay-values
>>> 
>>> result-0-->A  = Checkpoint-delay-value-A  -  0
>>> 
>>> result-A-->B = Checkpoint-delay-value-B  -  Checkpoint-delay-value-A
>>> 
>>> result-B-->C = Checkpoint-delay-value-C  -  Checkpoint-delay-value-B
>>> 
>>> someone ( result-X-->Y)  which is longer than 5s (maybe other
>> threshold)  should be the black sheep .
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2019/1/3 下午2:43, Yun Gao :
>>>> Hello liping,
>>>> 
>>>>       Thank you for proposing to optimize the backpressure detection!
>> From our previous experience, we think the InputBufferPoolUsageGauge and
>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a
>> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage
>> of task A and InputBufferPoolUsage of task B is 100%, but the
>> OutputBufferPoolUsage of task B is less than 100%, then it should be the
>> task B that causes the backpressure.
>>>> 
>>>>      However, currently we think that the InputBufferPoolUsage and
>> OutputBufferPoolUsage requires some modification to be more accurate:
>>>>           1. When there are multiple inputs or outputs, the
>> InputBufferPoolUsage and OutputBufferPoolUsage  should show the maximum
>> usage instead of the average usage [1].
>>>>         2. Currently the sender side will report backlog right before
>> fulfilling the output Buffer. Together with the pre-allocate logic in the
>> receiver side, the InputBufferPoolUsage may be 100% even if the data have
>> not been received yet [2].
>>>> 
>>>>     We may need to address these problems before adopting the
>> InputBufferPoolUsage  and OutputBufferPoolUsage as the backpressure
>> indicator.
>>>> 
>>>>     Besides, another similar thought is that we may also add new
>> InputBufferUsage and OutputBufferUsage metrics to show (number of queued
>> buffers / number of all buffers) instead.
>>>> 
>>>> 
>>>>    Best,
>>>>   Yun Gao
>>>> 
>>>> 
>>>>    [1] https://issues.apache.org/jira/browse/FLINK-10981
>>>>    [2] https://issues.apache.org/jira/browse/FLINK-11082
>>>> 
>>>> 
>>>> ------------------------------------------------------------------
>>>> From:裴立平 <peiliping...@gmail.com>
>>>> Send Time:2019 Jan. 3 (Thu.) 13:39
>>>> To:dev <dev@flink.apache.org>
>>>> Subject:[DISCUSS] Detection Flink Backpressure
>>>> 
>>>> Recently I want to optimize the way to find the positions where the
>>>> backpressures occured .
>>>> 
>>>> I read some blogs about flink-backpressure and have a rough idea of it .
>>>> 
>>>> The method which Flink adopted is thread-stack-sample ,  it's heavy and
>>>> no-lasting .
>>>> 
>>>> The positions where backpressures occured are very important to the
>>>> developers .
>>>> 
>>>> They should be treated as monitor-metrics .
>>>> 
>>>> Any other choice that we can take to detection the flink backpressures ?
>>>> 
>>> 
>> 
>> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to