Hi, We've been dealing with a similar problem of downstream consumers causing backpressure. One idea that a colleague of mine suggested is measuring the time it takes to call Collector[T].out. Since this method is used to push the records downstream, it will also actively block in case the buffer is full and there are no more floating buffers to allocate, hence causing the backpressure.
Thus, if you know the average time it takes this function to be invoked when there's no backpressure, you can make an educated guess on the time it takes when there is pressure (you'll need to measure these times in your source/operator), and actively slow down the number of records being pushed downstream. Yuval. On Thu, 7 Nov 2019, 9:17 Felipe Gutierrez, <[email protected]> wrote: > cool! I got to use it. > Now I have to get the jobID and vertice ID inside the operator. > > I forgot to mention. I am using Flink 1.9.1 > > Thanks! > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > > On Thu, Nov 7, 2019 at 4:59 AM Zhijiang <[email protected]> > wrote: > >> You can refer to this document [1] for the rest API details. >> Actually the backpreesure uri refers to " >> /jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether >> it is easy to get the jobid and vertexid. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html >> >> Best, >> Zhijiang >> >> ------------------------------------------------------------------ >> From:Felipe Gutierrez <[email protected]> >> Send Time:2019 Nov. 7 (Thu.) 00:06 >> To:Chesnay Schepler <[email protected]> >> Cc:Zhijiang <[email protected]>; user <[email protected]> >> Subject:Re: How can I get the backpressure signals inside my function or >> operator? >> >> If I can trigger the sample via rest API it is good for a POC. Then I can >> read from any in-memory storage using a separated thread within the >> operator. But what is the rest api that gives to me the ratio value from >> backpressure? >> >> Thanks >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler <[email protected]> >> wrote: >> >> I don't think there is a truly sane way to do this. >> >> I could envision a separate application triggering samples via the REST >> API, writing the results into kafka which your operator can read. This is >> probably the most reasonable solution I can come up with. >> >> Any attempt at accessing the TaskExecutor or metrics from within the >> operator are inadvisable; you'd be encroaching into truly hacky territory. >> >> You could also do your own backpressure sampling within your operator >> (separate thread within the operator executing the same sampling logic), >> but I don't know how easy it would be to re-use Flink code. >> >> On 06/11/2019 13:40, Felipe Gutierrez wrote: >> Does anyone know in which metric I can rely on to know if a given >> operator is activating the backpressure? >> Or how can I call the same java object that the Flink UI calls to give me >> the ratio of backpressure? >> >> Thanks, >> Felipe >> >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez * >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez < >> [email protected]> wrote: >> Hi Zhijiang, >> >> thanks for your reply. Yes, you understood correctly. >> The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength" >> on the operator might be because of the way Flink runtime architecture was >> designed. But I was wondering what kind of signal I can get. I guess some >> backpressure message I could get because backpressure works to slow down >> the upstream operators. >> >> For example, I can see the ratio per sub-task on the web interface [1]. >> It means the physical operators. Is there any message flowing backward that >> I can get? Is there anything that makes me able to not rely on some >> external storage? >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez * >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <[email protected]> >> wrote: >> Hi Felipe, >> >> That is an interesting idea to control the upstream's output based on >> downstream's input. >> >> If I understood correctly, the preAggregate operator would trigger flush >> output while the reduce operator is idle/hungry. In contrast, the >> preAggregate >> would continue aggregating data in the case of back pressure. >> >> I think this requirement is valid, but unfortunately I guess you can not >> get the back pressure signal from the operator level. AIK only the upper >> task level can get the input/output state to decide whether to process or >> not. >> >> If you want to get the reduce's metric of >> `Shuffle.Netty.Input.Buffers.inputQueueLength` >> on preAggregate side, you might rely on some external metric reporter to >> query it if possible. >> >> Best, >> Zhijiang >> >> ------------------------------------------------------------------ >> From:Felipe Gutierrez <[email protected]> >> Send Time:2019 Nov. 5 (Tue.) 16:58 >> To:user <[email protected]> >> Subject:How can I get the backpressure signals inside my function or >> operator? >> >> Hi all, >> >> let's say that I have a "source -> map .> preAggregrate -> keyBy -> >> reduce -> sink" job and the reducer is sending backpressure signals to the >> preAggregate, map and source operator. How do I get those signals inside my >> operator's implementation? >> I guess inside the function is not possible. But if I have my own >> operator implemented (preAggregate) can I get those backpressure signals? >> >> I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" >> [1] on my preAggregate operator in order to decide when I stop the >> pre-aggregation and flush tuples or when I keep pre aggregating. It is >> something like the "credit based control on the network stack" [2]. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service >> [2] https://www.youtube.com/watch?v=AbqatHF3tZI >> >> Thanks! >> Felipe >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez * >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> >>
