Hi Felipe, That is an interesting idea to control the upstream's output based on downstream's input.
If I understood correctly, the preAggregate operator would trigger flush output while the reduce operator is idle/hungry. In contrast, the preAggregate would continue aggregating data in the case of back pressure. I think this requirement is valid, but unfortunately I guess you can not get the back pressure signal from the operator level. AIK only the upper task level can get the input/output state to decide whether to process or not. If you want to get the reduce's metric of `Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might rely on some external metric reporter to query it if possible. Best, Zhijiang ------------------------------------------------------------------ From:Felipe Gutierrez <felipe.o.gutier...@gmail.com> Send Time:2019 Nov. 5 (Tue.) 16:58 To:user <user@flink.apache.org> Subject:How can I get the backpressure signals inside my function or operator? Hi all, let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> sink" job and the reducer is sending backpressure signals to the preAggregate, map and source operator. How do I get those signals inside my operator's implementation? I guess inside the function is not possible. But if I have my own operator implemented (preAggregate) can I get those backpressure signals? I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my preAggregate operator in order to decide when I stop the pre-aggregation and flush tuples or when I keep pre aggregating. It is something like the "credit based control on the network stack" [2]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service [2] https://www.youtube.com/watch?v=AbqatHF3tZI Thanks! Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com