humm, that is also another possibility. Thanks for your suggestion!

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Thu, Nov 7, 2019 at 10:41 PM Yuval Itzchakov <yuva...@gmail.com> wrote:

> Hi,
>
> We've been dealing with a similar problem of downstream consumers causing
> backpressure. One idea that a colleague of mine suggested is measuring the
> time it takes to call Collector[T].out. Since this method is used to push
> the records downstream, it will also actively block in case the buffer is
> full and there are no more floating buffers to allocate, hence causing the
> backpressure.
>
> Thus, if you know the average time it takes this function to be invoked
> when there's no backpressure, you can make an educated guess on the time it
> takes when there is pressure (you'll need to measure these times in your
> source/operator), and actively slow down the number of records being pushed
> downstream.
>
> Yuval.
>
> On Thu, 7 Nov 2019, 9:17 Felipe Gutierrez, <felipe.o.gutier...@gmail.com>
> wrote:
>
>> cool! I got to use it.
>> Now I have to get the jobID and vertice ID inside the operator.
>>
>> I forgot to mention. I am using Flink 1.9.1
>>
>> Thanks!
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Thu, Nov 7, 2019 at 4:59 AM Zhijiang <wangzhijiang...@aliyun.com>
>> wrote:
>>
>>> You can refer to this document [1] for the rest API details.
>>> Actually the backpreesure uri refers to "
>>> /jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure
>>> whether it is easy to get the jobid and vertexid.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html
>>>
>>> Best,
>>> Zhijiang
>>>
>>> ------------------------------------------------------------------
>>> From:Felipe Gutierrez <felipe.o.gutier...@gmail.com>
>>> Send Time:2019 Nov. 7 (Thu.) 00:06
>>> To:Chesnay Schepler <ches...@apache.org>
>>> Cc:Zhijiang <wangzhijiang...@aliyun.com>; user <user@flink.apache.org>
>>> Subject:Re: How can I get the backpressure signals inside my function or
>>> operator?
>>>
>>> If I can trigger the sample via rest API it is good for a POC. Then I
>>> can read from any in-memory storage using a separated thread within the
>>> operator. But what is the rest api that gives to me the ratio value from
>>> backpressure?
>>>
>>> Thanks
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler <ches...@apache.org>
>>> wrote:
>>>
>>> I don't think there is a truly sane way to do this.
>>>
>>> I could envision a separate application triggering samples via the REST
>>> API, writing the results into kafka which your operator can read. This is
>>> probably the most reasonable solution I can come up with.
>>>
>>> Any attempt at accessing the TaskExecutor or metrics from within the
>>> operator are inadvisable; you'd be encroaching into truly hacky territory.
>>>
>>> You could also do your own backpressure sampling within your operator
>>> (separate thread within the operator executing the same sampling logic),
>>> but I don't know how easy it would be to re-use Flink code.
>>>
>>> On 06/11/2019 13:40, Felipe Gutierrez wrote:
>>> Does anyone know in which metric I can rely on to know if a given
>>> operator is activating the backpressure?
>>> Or how can I call the same java object that the Flink UI calls to give
>>> me the ratio of backpressure?
>>>
>>> Thanks,
>>> Felipe
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez *
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>> Hi Zhijiang,
>>>
>>> thanks for your reply. Yes, you understood correctly.
>>> The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength"
>>> on the operator might be because of the way Flink runtime architecture was
>>> designed. But I was wondering what kind of signal I can get. I guess some
>>> backpressure message I could get because backpressure works to slow down
>>> the upstream operators.
>>>
>>> For example, I can see the ratio per sub-task on the web interface [1].
>>> It means the physical operators. Is there any message flowing backward that
>>> I can get? Is there anything that makes me able to not rely on some
>>> external storage?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez *
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Tue, Nov 5, 2019 at 12:23 PM Zhijiang <wangzhijiang...@aliyun.com>
>>> wrote:
>>> Hi Felipe,
>>>
>>> That is an interesting idea to control the upstream's output based on
>>> downstream's input.
>>>
>>> If I understood correctly, the preAggregate operator would trigger
>>> flush output while the reduce operator is idle/hungry. In contrast, the 
>>> preAggregate
>>> would continue aggregating data in the case of back pressure.
>>>
>>> I think this requirement is valid, but unfortunately I guess you can not
>>> get the back pressure signal from the operator level. AIK only the upper
>>> task level can get the input/output state to decide whether to process or
>>> not.
>>>
>>> If you want to get the reduce's metric of 
>>> `Shuffle.Netty.Input.Buffers.inputQueueLength`
>>> on preAggregate side, you might rely on some external metric reporter
>>> to query it if possible.
>>>
>>> Best,
>>> Zhijiang
>>>
>>> ------------------------------------------------------------------
>>> From:Felipe Gutierrez <felipe.o.gutier...@gmail.com>
>>> Send Time:2019 Nov. 5 (Tue.) 16:58
>>> To:user <user@flink.apache.org>
>>> Subject:How can I get the backpressure signals inside my function or
>>> operator?
>>>
>>> Hi all,
>>>
>>> let's say that I have a "source -> map .> preAggregrate -> keyBy ->
>>> reduce -> sink" job and the reducer is sending backpressure signals to the
>>> preAggregate, map and source operator. How do I get those signals inside my
>>> operator's implementation?
>>> I guess inside the function is not possible. But if I have my own
>>> operator implemented (preAggregate) can I get those backpressure signals?
>>>
>>> I want to get the messages
>>> "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] on my preAggregate
>>> operator in order to decide when I stop the pre-aggregation and flush
>>> tuples or when I keep pre aggregating. It is something like the "credit
>>> based control on the network stack" [2].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
>>> [2] https://www.youtube.com/watch?v=AbqatHF3tZI
>>>
>>> Thanks!
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez *
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>>
>>>

Reply via email to