Hi, In that case I think instead of fixing the current back pressure monitoring mechanism, it would be better to replace it with a new one based on output queues length. But I haven’t thought it through, especially with respect to performance implications, however my gut feeling is that it should be solvable in one way or another.
Piotrek > On 3 Jan 2019, at 20:05, Ken Krugler <kkrugler_li...@transpac.com> wrote: > > There’s the related issue of Async I/O not showing up in back pressure > reporting, also due to the same issue of threads not being sampled. > > — Ken > >> On Jan 3, 2019, at 10:25 AM, Jamie Grier <jgr...@lyft.com.INVALID> wrote: >> >> One unfortunate problem with the current back-pressure detection mechanism >> is that it doesn't work well with all of our sources. The problem is that >> some sources (Kinesis for sure) emit elements from threads Flink knows >> nothing about and therefore those stack traces aren't sampled. The result >> is that you never see back-pressure detected in the first chain of a Flink >> job containing that source. >> >> On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <pi...@da-platform.com> wrote: >> >>> Hi all, >>> >>> peiliping: I think your idea could be problematic for couple of reasons. >>> Probably minor concern is that checkpoint time could be affected not only >>> because of the back pressure, but also because how long does it take to >>> actually perform the checkpoint. Bigger issues are that this bottleneck >>> detection would be limited to only during checkpointing (what if one has >>> checkpoints only once every 1 hour? Or none at all?) AND >>> performance/bottlenecks may change significantly during checkpointing (for >>> example writing state for the first operator to DFS can affect indirectly >>> down stream operators). >>> >>> The idea of detecting back pressure/bottlenecks using output/input buffers >>> is much more natural. Because in the end, almost by definition, if the >>> output buffers are full, that means that the given task is back pressured. >>> >>> Both input and output queues length are already exposed via metrics, so >>> developers have an access to raw data to manually calculate/detect >>> bottlenecks. It would be actually nice to automatically aggregate those >>> metrics and provide ready to use metrics: boolean flags whether >>> task/stage/job are back pressured or not. >>> >>> Replacing current back pressure detection mechanism that probes the >>> threads and checks which of them are waiting for buffers is another issues. >>> Functionally it is equivalent to monitoring whether the output queues are >>> full. This might be more hacky, but will give the same results, thus it >>> wasn’t high on my priority list to change/refactor. It would be nice to >>> clean this up a little bit and unify, but using metrics can also mean some >>> additional work, since there are some known metrics related performance >>> issues. >>> >>> Piotrek >>> >>>> On 3 Jan 2019, at 10:35, peiliping <peiliping...@gmail.com> wrote: >>>> >>>> I have some ideas about detecting the backpressure (the blocking >>> operators) by checkpoint barrier . >>>> >>>> I have some flink-jobs with checkpoint , but their checkpoints will take >>> a long time to be completed . >>>> >>>> I need to find out the blocking operators , the same as the >>> backpressure detection . >>>> >>>> In a checkpoint object , I can get a timestamp which means the >>> start-time , then I compute a metric in >>>> >>>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() . >>>> >>>> The metric is a delta time between checkpoint.timestamp to the time >>> when StreamTask.executeCheckpointing invoke >>>> >>>> and I named it as checkpoint-delay-time . >>>> >>>> It looks like the end-to-end-time metric in checkpoint but not include >>> async-handles , >>>> >>>> For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 ) >>> ---> C (parallelism : 1) >>>> >>>> Checkpoint-delay-value-A : I get the max checkpoint-delay-time from >>> A(there are 2 instances ) >>>> >>>> Checkpoint-delay-value-B : I get the max checkpoint-delay-time from >>> B(there are 3 instances ) >>>> >>>> Checkpoint-delay-value-C : I get the max checkpoint-delay-time from >>> C(there is 1 instance) >>>> >>>> Then I can get the other 3 delta time from checkpoint-delay-values >>>> >>>> result-0-->A = Checkpoint-delay-value-A - 0 >>>> >>>> result-A-->B = Checkpoint-delay-value-B - Checkpoint-delay-value-A >>>> >>>> result-B-->C = Checkpoint-delay-value-C - Checkpoint-delay-value-B >>>> >>>> someone ( result-X-->Y) which is longer than 5s (maybe other >>> threshold) should be the black sheep . >>>> >>>> >>>> >>>> >>>> >>>> 在 2019/1/3 下午2:43, Yun Gao : >>>>> Hello liping, >>>>> >>>>> Thank you for proposing to optimize the backpressure detection! >>> From our previous experience, we think the InputBufferPoolUsageGauge and >>> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a >>> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage >>> of task A and InputBufferPoolUsage of task B is 100%, but the >>> OutputBufferPoolUsage of task B is less than 100%, then it should be the >>> task B that causes the backpressure. >>>>> >>>>> However, currently we think that the InputBufferPoolUsage and >>> OutputBufferPoolUsage requires some modification to be more accurate: >>>>> 1. When there are multiple inputs or outputs, the >>> InputBufferPoolUsage and OutputBufferPoolUsage should show the maximum >>> usage instead of the average usage [1]. >>>>> 2. Currently the sender side will report backlog right before >>> fulfilling the output Buffer. Together with the pre-allocate logic in the >>> receiver side, the InputBufferPoolUsage may be 100% even if the data have >>> not been received yet [2]. >>>>> >>>>> We may need to address these problems before adopting the >>> InputBufferPoolUsage and OutputBufferPoolUsage as the backpressure >>> indicator. >>>>> >>>>> Besides, another similar thought is that we may also add new >>> InputBufferUsage and OutputBufferUsage metrics to show (number of queued >>> buffers / number of all buffers) instead. >>>>> >>>>> >>>>> Best, >>>>> Yun Gao >>>>> >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-10981 >>>>> [2] https://issues.apache.org/jira/browse/FLINK-11082 >>>>> >>>>> >>>>> ------------------------------------------------------------------ >>>>> From:裴立平 <peiliping...@gmail.com> >>>>> Send Time:2019 Jan. 3 (Thu.) 13:39 >>>>> To:dev <dev@flink.apache.org> >>>>> Subject:[DISCUSS] Detection Flink Backpressure >>>>> >>>>> Recently I want to optimize the way to find the positions where the >>>>> backpressures occured . >>>>> >>>>> I read some blogs about flink-backpressure and have a rough idea of it . >>>>> >>>>> The method which Flink adopted is thread-stack-sample , it's heavy and >>>>> no-lasting . >>>>> >>>>> The positions where backpressures occured are very important to the >>>>> developers . >>>>> >>>>> They should be treated as monitor-metrics . >>>>> >>>>> Any other choice that we can take to detection the flink backpressures ? >>>>> >>>> >>> >>> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra >