Re: Enrichment with keyed state as cache

2025-06-27 Thread Lukasz Krawiec
Hi, At one point I explored using OperatorState and implementing CheckpointedFunction interface in the class that extends RichAsyncFunction. It seemed to work on my local machine, but I never tried it in production. On Tue, Jun 24, 2025 at 2:28 AM Jean-Marc Paulin wrote: > Hi Nate, > > We deleg

Re: Enrichment with keyed state as cache

2025-06-25 Thread Lasse Nedergaard
Med venlig hilsen / Best regardsLasse NedergaardDen 23. jun. 2025 kl. 22.40 skrev Gabor Somogyi :Hi Nate,I'm afraid we're in a situation where there is no off the shelf solution.An AsyncLookupFunction can be manually implemented based on AbstractRichFunction which does the aync lookup.The tricky p

Re: Enrichment with keyed state as cache

2025-06-24 Thread Lasse Nedergaard
Hi NateWe are doing enrichment in the pipeline as well. We started out do it in the Keyedprocessing function and I can’t recommend to go that route. You can’t make it async because you have to return from the calling function and blocking the processfunction cause many problems. Checkpoints is one

Re: Enrichment with keyed state as cache

2025-06-24 Thread Jean-Marc Paulin
Hi Nate, We delegate the enrichment to another microservice over kafka, and process the kafka response when we receive it. We use a KeyedCoProcessFunction to post the kafka request using a side output in the processElement1 handler, and we collect the response in the processElement2 handler. We ke

Re: Enrichment with keyed state as cache

2025-06-23 Thread Gabor Somogyi
Hi Nate, I'm afraid we're in a situation where there is no off the shelf solution. An AsyncLookupFunction can be manually implemented based on AbstractRichFunction which does the aync lookup. The tricky part is to fire threads for lookup and process result in mailbox thread because state access i

Enrichment with keyed state as cache

2025-06-23 Thread Nate Drake
Hi, We're building a Flink job to process metric data from client devices. We need to enrich these events via an external HTTP API. We were thinking we'd use Flink state as a cache of this enrichment data to reduce the load on the external service. It seems AsyncFunctions do not support keyed st