.. Maybe we should add a way to register those threads such that they are also sampled. Thoughts?
On Thu, Jan 3, 2019 at 10:25 AM Jamie Grier <jgr...@lyft.com> wrote: > One unfortunate problem with the current back-pressure detection mechanism > is that it doesn't work well with all of our sources. The problem is that > some sources (Kinesis for sure) emit elements from threads Flink knows > nothing about and therefore those stack traces aren't sampled. The result > is that you never see back-pressure detected in the first chain of a Flink > job containing that source. > > On Thu, Jan 3, 2019 at 2:12 AM Piotr Nowojski <pi...@da-platform.com> > wrote: > >> Hi all, >> >> peiliping: I think your idea could be problematic for couple of reasons. >> Probably minor concern is that checkpoint time could be affected not only >> because of the back pressure, but also because how long does it take to >> actually perform the checkpoint. Bigger issues are that this bottleneck >> detection would be limited to only during checkpointing (what if one has >> checkpoints only once every 1 hour? Or none at all?) AND >> performance/bottlenecks may change significantly during checkpointing (for >> example writing state for the first operator to DFS can affect indirectly >> down stream operators). >> >> The idea of detecting back pressure/bottlenecks using output/input >> buffers is much more natural. Because in the end, almost by definition, if >> the output buffers are full, that means that the given task is back >> pressured. >> >> Both input and output queues length are already exposed via metrics, so >> developers have an access to raw data to manually calculate/detect >> bottlenecks. It would be actually nice to automatically aggregate those >> metrics and provide ready to use metrics: boolean flags whether >> task/stage/job are back pressured or not. >> >> Replacing current back pressure detection mechanism that probes the >> threads and checks which of them are waiting for buffers is another issues. >> Functionally it is equivalent to monitoring whether the output queues are >> full. This might be more hacky, but will give the same results, thus it >> wasn’t high on my priority list to change/refactor. It would be nice to >> clean this up a little bit and unify, but using metrics can also mean some >> additional work, since there are some known metrics related performance >> issues. >> >> Piotrek >> >> > On 3 Jan 2019, at 10:35, peiliping <peiliping...@gmail.com> wrote: >> > >> > I have some ideas about detecting the backpressure (the blocking >> operators) by checkpoint barrier . >> > >> > I have some flink-jobs with checkpoint , but their checkpoints will >> take a long time to be completed . >> > >> > I need to find out the blocking operators , the same as the >> backpressure detection . >> > >> > In a checkpoint object , I can get a timestamp which means the >> start-time , then I compute a metric in >> > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask.executeCheckpointing() . >> > >> > The metric is a delta time between checkpoint.timestamp to the time >> when StreamTask.executeCheckpointing invoke >> > >> > and I named it as checkpoint-delay-time . >> > >> > It looks like the end-to-end-time metric in checkpoint but not include >> async-handles , >> > >> > For example a list of tasks A(parallelism :2 ) ---> B(parallelism :3 ) >> ---> C (parallelism : 1) >> > >> > Checkpoint-delay-value-A : I get the max checkpoint-delay-time from >> A(there are 2 instances ) >> > >> > Checkpoint-delay-value-B : I get the max checkpoint-delay-time from >> B(there are 3 instances ) >> > >> > Checkpoint-delay-value-C : I get the max checkpoint-delay-time from >> C(there is 1 instance) >> > >> > Then I can get the other 3 delta time from checkpoint-delay-values >> > >> > result-0-->A = Checkpoint-delay-value-A - 0 >> > >> > result-A-->B = Checkpoint-delay-value-B - Checkpoint-delay-value-A >> > >> > result-B-->C = Checkpoint-delay-value-C - Checkpoint-delay-value-B >> > >> > someone ( result-X-->Y) which is longer than 5s (maybe other >> threshold) should be the black sheep . >> > >> > >> > >> > >> > >> > 在 2019/1/3 下午2:43, Yun Gao : >> >> Hello liping, >> >> >> >> Thank you for proposing to optimize the backpressure detection! >> From our previous experience, we think the InputBufferPoolUsageGauge and >> OutputBufferPoolUsageGauge may be useful for detecting backpressure: for a >> list of tasks A ---> B ----> C, if we found that the OutputBufferPoolUsage >> of task A and InputBufferPoolUsage of task B is 100%, but the >> OutputBufferPoolUsage of task B is less than 100%, then it should be the >> task B that causes the backpressure. >> >> >> >> However, currently we think that the InputBufferPoolUsage and >> OutputBufferPoolUsage requires some modification to be more accurate: >> >> 1. When there are multiple inputs or outputs, the >> InputBufferPoolUsage and OutputBufferPoolUsage should show the maximum >> usage instead of the average usage [1]. >> >> 2. Currently the sender side will report backlog right before >> fulfilling the output Buffer. Together with the pre-allocate logic in the >> receiver side, the InputBufferPoolUsage may be 100% even if the data have >> not been received yet [2]. >> >> >> >> We may need to address these problems before adopting the >> InputBufferPoolUsage and OutputBufferPoolUsage as the backpressure >> indicator. >> >> >> >> Besides, another similar thought is that we may also add new >> InputBufferUsage and OutputBufferUsage metrics to show (number of queued >> buffers / number of all buffers) instead. >> >> >> >> >> >> Best, >> >> Yun Gao >> >> >> >> >> >> [1] https://issues.apache.org/jira/browse/FLINK-10981 >> >> [2] https://issues.apache.org/jira/browse/FLINK-11082 >> >> >> >> >> >> ------------------------------------------------------------------ >> >> From:裴立平 <peiliping...@gmail.com> >> >> Send Time:2019 Jan. 3 (Thu.) 13:39 >> >> To:dev <dev@flink.apache.org> >> >> Subject:[DISCUSS] Detection Flink Backpressure >> >> >> >> Recently I want to optimize the way to find the positions where the >> >> backpressures occured . >> >> >> >> I read some blogs about flink-backpressure and have a rough idea of it >> . >> >> >> >> The method which Flink adopted is thread-stack-sample , it's heavy and >> >> no-lasting . >> >> >> >> The positions where backpressures occured are very important to the >> >> developers . >> >> >> >> They should be treated as monitor-metrics . >> >> >> >> Any other choice that we can take to detection the flink backpressures >> ? >> >> >> > >> >>