Glad to hear it worked out for you :-) Cheers, Igal
On Tue, Nov 2, 2021 at 1:57 PM Filip Karnicki <filip.karni...@gmail.com> wrote: > Hi All > > Just an update for future reference, it turned out that the machine we > were using for this test didn't have enough memory for what we were asking > it to do. It was that simple. The upside is that not even with the world's > most unstable cluster did we manage to lose a single message. > > Just as an aside, we got the best results by switching back to undertow, > but we ended up using it slightly differently than the current example in > the docs suggests. We needed to pass the work onto a worker thread because > we had a blocking call in our funcion > > class Handler extends HttpHandler{ > (...) > > def handleRequest(exchange: HttpServerExchange): Unit = { > if (exchange.isInIoThread) { > exchange.dispatch(this) > return > } > exchange.getRequestReceiver.receiveFullBytes((exchange, bytes) => { > flinkHandler > .handle(Slices.wrap(bytes)) > .whenComplete((response: Slice, exception: Throwable) => > onComplete(exchange, response, exception)) > }) > } > > def onComplete(exchange: HttpServerExchange, slice: Slice, throwable: > Throwable) = (... as per the example) > > } > > Many thanks again for your help, Igal > > On Wed, 27 Oct 2021 at 13:59, Filip Karnicki <filip.karni...@gmail.com> > wrote: > >> Thanks for your reply Igal >> >> The reason why I'm using data stream integration is that the messages on >> kafka are in .json, and I need to convert them to protobufs for embedded >> functions. If I was using remote functions I wouldn't need to do that. >> >> With regards to performance, in order to exclude the possibility that >> it's the remote service that's causing a slowdown, I replaced the undertow >> example from the docs with 5 instances of webflux services that hand off >> the work from an event loop to a worker which then sleeps for 1 second. I >> then launched an nginx instance to forward the request in a round robin >> fashion to the 5 webflux instances. >> >> When I push 10_000 messages onto the ingress kafka topic, it takes >> upwards of 100 seconds to process all messages. The flink cluster first >> works pretty hard for about 30 seconds (at ~100% of cpu utilisation) then >> everything slows down and eventually I get tens of messages trickling down >> until, after the flink-side statefun job (not the remote job) crashes and >> gets restarted, which is when the last few stragglers get sent to the >> egress after 120+ seconds from the launch of the test. >> >> I can try to replicate this outside of my work environment if you'd like >> to run it yourself, but in the meantime, is there a way to achieve this >> 'sequencial-per-key' behaviour with the use of embedded functions? Those >> seem to be rock-solid. Maybe there are some internal classes that would at >> least provide a template on how to do it? I have a naive implementation >> ready (the one I described in the previous email) but I'm sure there are >> some edge cases I haven't thought of. >> >> Thanks again, >> Fil >> >> >> >> On Wed, 27 Oct 2021 at 09:43, Igal Shilman <igal.shil...@gmail.com> >> wrote: >> >>> Hello Fil, >>> >>> Indeed what you are describing is exactly what a remote function does. >>> >>> I am curious to learn more about the current performance limitations >>> that you encounter with the remote functions. >>> >>> One thing to try in combination with the async transport, is to increase >>> the total number of in fight async operations, by setting the following >>> property in flink-conf.yaml: >>> >>> statefun.async.max-per-task >>> >>> To a much higher value than 1024, try experimenting with 16k,32k,64k and >>> even higher. >>> >>> Let me know if that improves the situation, and we can continue from >>> there. >>> >>> p.s, >>> >>> You've mentioned that you are using the data stream integration, where >>> there any particular reason you chose that? It has some limitations at the >>> moment with respect to remote functions. >>> >>> >>> Cheers, >>> Igal >>> >>> On Wed 27. Oct 2021 at 08:49, Filip Karnicki <filip.karni...@gmail.com> >>> wrote: >>> >>>> Hi >>>> >>>> I have a kafka topic with json messages that I map to protobufs within >>>> a data stream, and then send those to embedded stateful functions using the >>>> datastream integration api (DataStream[RoutableMessage]). From there I need >>>> to make an idempotent long-running blocking IO call. >>>> >>>> I noticed that I was processing messages sequentially per kafka >>>> partition. Is there a way that I could process them sequentially by key >>>> only (but in parallel per partition)? >>>> >>>> I created some code that uses the embedded functions' >>>> registerAsyncOperation capabilities to make my long-running IO calls >>>> effectively asynchronous, but I had to add all this custom code to enqueue >>>> and persist any messages for a key that came in while there was an >>>> in-flight IO call happening for that key. I'm fairly confident that I can >>>> figure out all the fault tolerance cases _eventually_ (including re-sending >>>> the in-flight message upon getting the UNKNOWN status back from the async >>>> operation). >>>> >>>> That said, am I missing a trick that would allow Flink/statefun to take >>>> care of this "parallel per partition, sequential-per-key" behaviour? Remote >>>> functions don't seem to have the performance we need, even with async http >>>> transport. >>>> >>>> Many thanks! >>>> Fil >>>> >>> -- >>> >>> --- >>> >>> about.me/igalshilman >>> >>>