Thanks for your reply Igal The reason why I'm using data stream integration is that the messages on kafka are in .json, and I need to convert them to protobufs for embedded functions. If I was using remote functions I wouldn't need to do that.
With regards to performance, in order to exclude the possibility that it's the remote service that's causing a slowdown, I replaced the undertow example from the docs with 5 instances of webflux services that hand off the work from an event loop to a worker which then sleeps for 1 second. I then launched an nginx instance to forward the request in a round robin fashion to the 5 webflux instances. When I push 10_000 messages onto the ingress kafka topic, it takes upwards of 100 seconds to process all messages. The flink cluster first works pretty hard for about 30 seconds (at ~100% of cpu utilisation) then everything slows down and eventually I get tens of messages trickling down until, after the flink-side statefun job (not the remote job) crashes and gets restarted, which is when the last few stragglers get sent to the egress after 120+ seconds from the launch of the test. I can try to replicate this outside of my work environment if you'd like to run it yourself, but in the meantime, is there a way to achieve this 'sequencial-per-key' behaviour with the use of embedded functions? Those seem to be rock-solid. Maybe there are some internal classes that would at least provide a template on how to do it? I have a naive implementation ready (the one I described in the previous email) but I'm sure there are some edge cases I haven't thought of. Thanks again, Fil On Wed, 27 Oct 2021 at 09:43, Igal Shilman <igal.shil...@gmail.com> wrote: > Hello Fil, > > Indeed what you are describing is exactly what a remote function does. > > I am curious to learn more about the current performance limitations that > you encounter with the remote functions. > > One thing to try in combination with the async transport, is to increase > the total number of in fight async operations, by setting the following > property in flink-conf.yaml: > > statefun.async.max-per-task > > To a much higher value than 1024, try experimenting with 16k,32k,64k and > even higher. > > Let me know if that improves the situation, and we can continue from there. > > p.s, > > You've mentioned that you are using the data stream integration, where > there any particular reason you chose that? It has some limitations at the > moment with respect to remote functions. > > > Cheers, > Igal > > On Wed 27. Oct 2021 at 08:49, Filip Karnicki <filip.karni...@gmail.com> > wrote: > >> Hi >> >> I have a kafka topic with json messages that I map to protobufs within a >> data stream, and then send those to embedded stateful functions using the >> datastream integration api (DataStream[RoutableMessage]). From there I need >> to make an idempotent long-running blocking IO call. >> >> I noticed that I was processing messages sequentially per kafka >> partition. Is there a way that I could process them sequentially by key >> only (but in parallel per partition)? >> >> I created some code that uses the embedded functions' >> registerAsyncOperation capabilities to make my long-running IO calls >> effectively asynchronous, but I had to add all this custom code to enqueue >> and persist any messages for a key that came in while there was an >> in-flight IO call happening for that key. I'm fairly confident that I can >> figure out all the fault tolerance cases _eventually_ (including re-sending >> the in-flight message upon getting the UNKNOWN status back from the async >> operation). >> >> That said, am I missing a trick that would allow Flink/statefun to take >> care of this "parallel per partition, sequential-per-key" behaviour? Remote >> functions don't seem to have the performance we need, even with async http >> transport. >> >> Many thanks! >> Fil >> > -- > > --- > > about.me/igalshilman > >