Hi Nate

We are doing enrichment in the pipeline as well. We started out do it in the Keyedprocessing function and I can’t recommend to go that route. You can’t make it async because you have to return from the calling function and blocking the processfunction cause many problems. Checkpoints is one of them. The only solution using the keyed process function is to use a timer. 
I would go with the async function instead and and perhaps store the results on Kafka so you can ingest the results earlier in the pipeline. 

But don’t use the keyed process function to do http calls. 

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 23. jun. 2025 kl. 22.40 skrev Gabor Somogyi <gabor.g.somo...@gmail.com>:


Hi Nate,

I'm afraid we're in a situation where there is no off the shelf solution.

An AsyncLookupFunction can be manually implemented based on AbstractRichFunction which does the aync lookup.
The tricky part is to fire threads for lookup and process result in mailbox thread because state access is not allowed on external threads.
Timeouts are also needs to be handled so not a trivial task...

Let's see how the async part is shaping but sooner of later we need to add async lookup because it's a common pattern.

BR,
G


On Mon, Jun 23, 2025 at 9:47 PM Nate Drake <ndr...@gmail.com> wrote:
Hi,

We're building a Flink job to process metric data from client devices. We need to enrich these events via an external HTTP API.  We were thinking we'd use Flink state as a cache of this enrichment data to reduce the load on the external service.  It seems AsyncFunctions do not support keyed state at the moment, so that's out.  


In the post, they're making synchronous HTTP calls in a KeyedProcessFunction and it seems to perform well.  I've read conflicting information on this pattern though, in similar questions on Stack Overflow, saying performing synchronous requests like this is bad as it can block checkpointing, etc.

Are there any recommended patterns to do something like this without compromising Flink's fault tolerance?  Our enrichment data is somewhat expensive to build and would be requested pretty frequently, but is fairly long-lived (~24 hr TTL).  So, caching is a requirement to avoid negatively impacting the other system. I suppose we could implement caching between the API and Flink, but was hoping to avoid something like that.

Thanks!

Nate




Reply via email to