Hello Igal,

 

Thanks, these are two valuable tips I will try. The maxNumBatchRequests 
parameter I misunderstood. I thought it was the maximum number of “function 
calls” which are sent within one web request. I’ll experiment a bit with it. It 
woud still be static for the flink cluster’s lifetime, but at least we could 
have a different setting per remote function definition.

And yes, autoscaling is also an option which makes sense. But I’ll have to test 
if it is fast enough to react. Because a newly spawned container might need to 
deserialize some data, connect to a database etc., so scaling can be slow. And 
of course at some point also limits of the cluster might be reached. As it is 
Flink doesn’t slow down with sending messages until the service is ready.

 

I guess with the two tips together we can achieve a fairly robust setup.

 

For future design a simple solution would be to allow to define an HTTP status 
code in the module definition which signals backpressure. So one could define 
that a status 429 or 503 makes Flink create backpressure just as if the 
“statefun.async.max-per-task” would have been reached. Of course also more 
sophisticated rules like a dynamic concurrency limiting based on latencies is 
thinkable, but more complicated.

 

Best,

 

Christian 

 

 

 

 

Von: Igal Shilman <[email protected]> 
Gesendet: Donnerstag, 7. Oktober 2021 21:32
An: Christian Krudewig (Corporate Development) <[email protected]>
Cc: [email protected]
Betreff: Re: How to create backpressure with a Statefun remote function?

 

Hello Christian,

 

The challenge with generic back pressure and remote functions, is that StateFun 
doesn't know if it targets a single process or a fleet of processes behind a 
load balancer and an autoscaler.

Triggering back pressure too early might never kick in the autoscaling.

 

Indeed that parameter you have found will trigger back pressure when the total 
number of requests per task slot exceeds that value. There is an additional 
param that will trigger back pressure per function address.

This is called: maxNumBatchRequests

And is more fine-grained than the per-task slot parameter. Reducing this value 
might be recommend if the total processing time of a single message is 
potentially high (CPU intensive/ or a long IO)

 

I think that this use case is valid, and we need to think about the case where 
the set of remote functions is static (up to a manual scale up)

I don't have a good idea at the moment as deciding to rather to back pressure 
or not requires some sort of a global knowledge.

 

What I would recommend is, if it fits your infra, is to consider an auto scaler 
for the remote functions according to a metric that makes sense to you, and use 
the max-in-flight parameter as a high safety net.

 

Cheers,

Igal

 

On Thu 7. Oct 2021 at 14:03, Christian Krudewig (Corporate Development) 
<[email protected] <mailto:[email protected]> > wrote:

Hello fellow Flink users,

How do you create backpressure with Statefun remote functions? I'm using an
asynchronous web server for the remote function (Python aiohttp on uvicorn)
which accepts more requests than its CPU bound backend can handle. That can
make the requests time out and can trigger a restart loop of the whole Flink
pipeline. Of course only in situations where so many requests are coming
into the ingress kafka stream that the service cannot handle it anymore.

Desired behavior: Flink only consumes as many records from the input stream
as the pipeline can handle instead of overloading the remote functions.

What I tried so far:
1. Set "statefun.async.max-per-task" in flink.conf to a low number. This
works. But that is one global static config for all function which cannot be
changed without restarting the cluster when the remote functions are scaled
up or down.
2. Add concurrency limiting to the remote function service. If the function
service returns failure codes (500, 503, 429) that doesn't seem to create
backpressure but is handled like a normal failure by flink with retries
until finally the whole pipeline gets restarted.
3. Try the new "io.statefun.transports.v1/async" transport type for the
endpoints with a low "pool_size" parameter. But reaching the pool size seems
to be treated like an error instead of creating backpressure. Same effect as
option 2.

Is there some other option? How should it be done by design?

Thanks,

Christian



Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to