humm, that is also another possibility. Thanks for your suggestion! *--* *-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Thu, Nov 7, 2019 at 10:41 PM Yuval Itzchakov <yuva...@gmail.com> wrote: > Hi, > > We've been dealing with a similar problem of downstream consumers causing > backpressure. One idea that a colleague of mine suggested is measuring the > time it takes to call Collector[T].out. Since this method is used to push > the records downstream, it will also actively block in case the buffer is > full and there are no more floating buffers to allocate, hence causing the > backpressure. > > Thus, if you know the average time it takes this function to be invoked > when there's no backpressure, you can make an educated guess on the time it > takes when there is pressure (you'll need to measure these times in your > source/operator), and actively slow down the number of records being pushed > downstream. > > Yuval. > > On Thu, 7 Nov 2019, 9:17 Felipe Gutierrez, <felipe.o.gutier...@gmail.com> > wrote: > >> cool! I got to use it. >> Now I have to get the jobID and vertice ID inside the operator. >> >> I forgot to mention. I am using Flink 1.9.1 >> >> Thanks! >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Thu, Nov 7, 2019 at 4:59 AM Zhijiang <wangzhijiang...@aliyun.com> >> wrote: >> >>> You can refer to this document [1] for the rest API details. >>> Actually the backpreesure uri refers to " >>> /jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure >>> whether it is easy to get the jobid and vertexid. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html >>> >>> Best, >>> Zhijiang >>> >>> ------------------------------------------------------------------ >>> From:Felipe Gutierrez <felipe.o.gutier...@gmail.com> >>> Send Time:2019 Nov. 7 (Thu.) 00:06 >>> To:Chesnay Schepler <ches...@apache.org> >>> Cc:Zhijiang <wangzhijiang...@aliyun.com>; user <user@flink.apache.org> >>> Subject:Re: How can I get the backpressure signals inside my function or >>> operator? >>> >>> If I can trigger the sample via rest API it is good for a POC. Then I >>> can read from any in-memory storage using a separated thread within the >>> operator. But what is the rest api that gives to me the ratio value from >>> backpressure? >>> >>> Thanks >>> *--* >>> *-- Felipe Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez* >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >>> >>> On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler <ches...@apache.org> >>> wrote: >>> >>> I don't think there is a truly sane way to do this. >>> >>> I could envision a separate application triggering samples via the REST >>> API, writing the results into kafka which your operator can read. This is >>> probably the most reasonable solution I can come up with. >>> >>> Any attempt at accessing the TaskExecutor or metrics from within the >>> operator are inadvisable; you'd be encroaching into truly hacky territory. >>> >>> You could also do your own backpressure sampling within your operator >>> (separate thread within the operator executing the same sampling logic), >>> but I don't know how easy it would be to re-use Flink code. >>> >>> On 06/11/2019 13:40, Felipe Gutierrez wrote: >>> Does anyone know in which metric I can rely on to know if a given >>> operator is activating the backpressure? >>> Or how can I call the same java object that the Flink UI calls to give >>> me the ratio of backpressure? >>> >>> Thanks, >>> Felipe >>> >>> *--* >>> *-- Felipe Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez * >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >>> >>> On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez < >>> felipe.o.gutier...@gmail.com> wrote: >>> Hi Zhijiang, >>> >>> thanks for your reply. Yes, you understood correctly. >>> The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength" >>> on the operator might be because of the way Flink runtime architecture was >>> designed. But I was wondering what kind of signal I can get. I guess some >>> backpressure message I could get because backpressure works to slow down >>> the upstream operators. >>> >>> For example, I can see the ratio per sub-task on the web interface [1]. >>> It means the physical operators. Is there any message flowing backward that >>> I can get? Is there anything that makes me able to not rely on some >>> external storage? >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads >>> *--* >>> *-- Felipe Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez * >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >>> >>> On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <wangzhijiang...@aliyun.com> >>> wrote: >>> Hi Felipe, >>> >>> That is an interesting idea to control the upstream's output based on >>> downstream's input. >>> >>> If I understood correctly, the preAggregate operator would trigger >>> flush output while the reduce operator is idle/hungry. In contrast, the >>> preAggregate >>> would continue aggregating data in the case of back pressure. >>> >>> I think this requirement is valid, but unfortunately I guess you can not >>> get the back pressure signal from the operator level. AIK only the upper >>> task level can get the input/output state to decide whether to process or >>> not. >>> >>> If you want to get the reduce's metric of >>> `Shuffle.Netty.Input.Buffers.inputQueueLength` >>> on preAggregate side, you might rely on some external metric reporter >>> to query it if possible. >>> >>> Best, >>> Zhijiang >>> >>> ------------------------------------------------------------------ >>> From:Felipe Gutierrez <felipe.o.gutier...@gmail.com> >>> Send Time:2019 Nov. 5 (Tue.) 16:58 >>> To:user <user@flink.apache.org> >>> Subject:How can I get the backpressure signals inside my function or >>> operator? >>> >>> Hi all, >>> >>> let's say that I have a "source -> map .> preAggregrate -> keyBy -> >>> reduce -> sink" job and the reducer is sending backpressure signals to the >>> preAggregate, map and source operator. How do I get those signals inside my >>> operator's implementation? >>> I guess inside the function is not possible. But if I have my own >>> operator implemented (preAggregate) can I get those backpressure signals? >>> >>> I want to get the messages >>> "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my preAggregate >>> operator in order to decide when I stop the pre-aggregation and flush >>> tuples or when I keep pre aggregating. It is something like the "credit >>> based control on the network stack" [2]. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service >>> [2] https://www.youtube.com/watch?v=AbqatHF3tZI >>> >>> Thanks! >>> Felipe >>> *--* >>> *-- Felipe Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez * >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >>> >>> >>>