Glad to hear it worked out for you :-)

Cheers,
Igal

On Tue, Nov 2, 2021 at 1:57 PM Filip Karnicki <filip.karni...@gmail.com>
wrote:

> Hi All
>
> Just an update for future reference, it turned out that the machine we
> were using for this test didn't have enough memory for what we were asking
> it to do. It was that simple. The upside is that not even with the world's
> most unstable cluster did we manage to lose a single message.
>
> Just as an aside, we got the best results by switching back to undertow,
> but we ended up using it slightly differently than the current example in
> the docs suggests. We needed to pass the work onto a worker thread because
> we had a blocking call in our funcion
>
> class Handler extends HttpHandler{
> (...)
>
>   def handleRequest(exchange: HttpServerExchange): Unit = {
>     if (exchange.isInIoThread) {
>       exchange.dispatch(this)
>       return
>     }
>     exchange.getRequestReceiver.receiveFullBytes((exchange, bytes) => {
>       flinkHandler
>         .handle(Slices.wrap(bytes))
>         .whenComplete((response: Slice, exception: Throwable) =>
>         onComplete(exchange, response, exception))
>     })
>   }
>
>   def onComplete(exchange: HttpServerExchange, slice: Slice, throwable:
> Throwable) = (... as per the example)
>
> }
>
> Many thanks again for your help, Igal
>
> On Wed, 27 Oct 2021 at 13:59, Filip Karnicki <filip.karni...@gmail.com>
> wrote:
>
>> Thanks for your reply Igal
>>
>> The reason why I'm using data stream integration is that the messages on
>> kafka are in .json, and I need to convert them to protobufs for embedded
>> functions. If I was using remote functions I wouldn't need to do that.
>>
>> With regards to performance, in order to exclude the possibility that
>> it's the remote service that's causing a slowdown, I replaced the undertow
>> example from the docs with 5 instances of webflux services that hand off
>> the work from an event loop to a worker which then sleeps for 1 second. I
>> then launched an nginx instance to forward the request in a round robin
>> fashion to the 5 webflux instances.
>>
>> When I push 10_000 messages onto the ingress kafka topic, it takes
>> upwards of 100 seconds to process all messages. The flink cluster first
>> works pretty hard for about 30 seconds (at ~100% of cpu utilisation) then
>> everything slows down and eventually I get tens of messages trickling down
>> until, after the flink-side statefun job (not the remote job) crashes and
>> gets restarted, which is when the last few stragglers get sent to the
>> egress after 120+ seconds from the launch of the test.
>>
>> I can try to replicate this outside of my work environment if you'd like
>> to run it yourself, but in the meantime, is there a way to achieve this
>> 'sequencial-per-key' behaviour with the use of embedded functions? Those
>> seem to be rock-solid. Maybe there are some internal classes that would at
>> least provide a template on how to do it? I have a naive implementation
>> ready (the one I described in the previous email) but I'm sure there are
>> some edge cases I haven't thought of.
>>
>> Thanks again,
>> Fil
>>
>>
>>
>> On Wed, 27 Oct 2021 at 09:43, Igal Shilman <igal.shil...@gmail.com>
>> wrote:
>>
>>> Hello Fil,
>>>
>>> Indeed what you are describing is exactly what a remote function does.
>>>
>>> I am curious to learn more about the current performance limitations
>>> that you encounter with the remote functions.
>>>
>>> One thing to try in combination with the async transport, is to increase
>>> the total number of in fight async operations, by setting the following
>>> property in flink-conf.yaml:
>>>
>>> statefun.async.max-per-task
>>>
>>> To a much higher value than 1024, try experimenting with 16k,32k,64k and
>>> even higher.
>>>
>>> Let me know if that improves the situation, and we can continue from
>>> there.
>>>
>>> p.s,
>>>
>>> You've mentioned that you are using the data stream integration, where
>>> there any particular reason you chose that? It has some limitations at the
>>> moment with respect to remote functions.
>>>
>>>
>>> Cheers,
>>> Igal
>>>
>>> On Wed 27. Oct 2021 at 08:49, Filip Karnicki <filip.karni...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> I have a kafka topic with json messages that I map to protobufs within
>>>> a data stream, and then send those to embedded stateful functions using the
>>>> datastream integration api (DataStream[RoutableMessage]). From there I need
>>>> to make an idempotent long-running blocking IO call.
>>>>
>>>> I noticed that I was processing messages sequentially per kafka
>>>> partition. Is there a way that I could process them sequentially by key
>>>> only (but in parallel per partition)?
>>>>
>>>> I created some code that uses the embedded functions'
>>>> registerAsyncOperation capabilities to make my long-running IO calls
>>>> effectively asynchronous, but I had to add all this custom code to enqueue
>>>> and persist any messages for a key that came in while there was an
>>>> in-flight IO call happening for that key. I'm fairly confident that I can
>>>> figure out all the fault tolerance cases _eventually_ (including re-sending
>>>> the in-flight message upon getting the UNKNOWN status back from the async
>>>> operation).
>>>>
>>>> That said, am I missing a trick that would allow Flink/statefun to take
>>>> care of this "parallel per partition, sequential-per-key" behaviour? Remote
>>>> functions don't seem to have the performance we need, even with async http
>>>> transport.
>>>>
>>>> Many thanks!
>>>> Fil
>>>>
>>> --
>>>
>>> ---
>>>
>>> about.me/igalshilman
>>>
>>>

Reply via email to