Thanks for your reply Igal

The reason why I'm using data stream integration is that the messages on
kafka are in .json, and I need to convert them to protobufs for embedded
functions. If I was using remote functions I wouldn't need to do that.

With regards to performance, in order to exclude the possibility that it's
the remote service that's causing a slowdown, I replaced the undertow
example from the docs with 5 instances of webflux services that hand off
the work from an event loop to a worker which then sleeps for 1 second. I
then launched an nginx instance to forward the request in a round robin
fashion to the 5 webflux instances.

When I push 10_000 messages onto the ingress kafka topic, it takes upwards
of 100 seconds to process all messages. The flink cluster first works
pretty hard for about 30 seconds (at ~100% of cpu utilisation) then
everything slows down and eventually I get tens of messages trickling down
until, after the flink-side statefun job (not the remote job) crashes and
gets restarted, which is when the last few stragglers get sent to the
egress after 120+ seconds from the launch of the test.

I can try to replicate this outside of my work environment if you'd like to
run it yourself, but in the meantime, is there a way to achieve this
'sequencial-per-key' behaviour with the use of embedded functions? Those
seem to be rock-solid. Maybe there are some internal classes that would at
least provide a template on how to do it? I have a naive implementation
ready (the one I described in the previous email) but I'm sure there are
some edge cases I haven't thought of.

Thanks again,

On Wed, 27 Oct 2021 at 09:43, Igal Shilman <> wrote:

> Hello Fil,
> Indeed what you are describing is exactly what a remote function does.
> I am curious to learn more about the current performance limitations that
> you encounter with the remote functions.
> One thing to try in combination with the async transport, is to increase
> the total number of in fight async operations, by setting the following
> property in flink-conf.yaml:
> statefun.async.max-per-task
> To a much higher value than 1024, try experimenting with 16k,32k,64k and
> even higher.
> Let me know if that improves the situation, and we can continue from there.
> p.s,
> You've mentioned that you are using the data stream integration, where
> there any particular reason you chose that? It has some limitations at the
> moment with respect to remote functions.
> Cheers,
> Igal
> On Wed 27. Oct 2021 at 08:49, Filip Karnicki <>
> wrote:
>> Hi
>> I have a kafka topic with json messages that I map to protobufs within a
>> data stream, and then send those to embedded stateful functions using the
>> datastream integration api (DataStream[RoutableMessage]). From there I need
>> to make an idempotent long-running blocking IO call.
>> I noticed that I was processing messages sequentially per kafka
>> partition. Is there a way that I could process them sequentially by key
>> only (but in parallel per partition)?
>> I created some code that uses the embedded functions'
>> registerAsyncOperation capabilities to make my long-running IO calls
>> effectively asynchronous, but I had to add all this custom code to enqueue
>> and persist any messages for a key that came in while there was an
>> in-flight IO call happening for that key. I'm fairly confident that I can
>> figure out all the fault tolerance cases _eventually_ (including re-sending
>> the in-flight message upon getting the UNKNOWN status back from the async
>> operation).
>> That said, am I missing a trick that would allow Flink/statefun to take
>> care of this "parallel per partition, sequential-per-key" behaviour? Remote
>> functions don't seem to have the performance we need, even with async http
>> transport.
>> Many thanks!
>> Fil
> --
> ---

Reply via email to