Hi, At one point I explored using OperatorState and implementing CheckpointedFunction interface in the class that extends RichAsyncFunction. It seemed to work on my local machine, but I never tried it in production.
On Tue, Jun 24, 2025 at 2:28 AM Jean-Marc Paulin <jm.pau...@gmail.com> wrote: > Hi Nate, > > We delegate the enrichment to another microservice over kafka, and process > the kafka response when we receive it. We use a KeyedCoProcessFunction to > post the kafka request using a side output in the processElement1 handler, > and we collect the response in the processElement2 handler. We keep a > request/response id in the Keyed state. > > This is far from perfect, introduces extra latency, but in our case the > enrichment queries can take a few seconds. We still need to handle flink > restarts and the fact we may end-up processing the same message twice if we > restart from a previous savepoint. > > Just an idea. We already use kafka extensively so that was not a major > challenge for us. > > Hope this helps > > JM > > > On Mon, Jun 23, 2025 at 8:47 PM Nate Drake <ndr...@gmail.com> wrote: > >> Hi, >> >> We're building a Flink job to process metric data from client devices. We >> need to enrich these events via an external HTTP API. We were thinking >> we'd use Flink state as a cache of this enrichment data to reduce the load >> on the external service. It seems AsyncFunctions do not support keyed >> state at the moment, so that's out. >> >> I've found this AWS blog describing a possible solution: >> https://aws.amazon.com/blogs/big-data/implement-apache-flink-real-time-data-enrichment-patterns/ >> >> >> In the post, they're making synchronous HTTP calls in a >> KeyedProcessFunction and it seems to perform well. I've read conflicting >> information on this pattern though, in similar questions on Stack Overflow, >> saying performing synchronous requests like this is bad as it can block >> checkpointing, etc. >> >> Are there any recommended patterns to do something like this without >> compromising Flink's fault tolerance? Our enrichment data is somewhat >> expensive to build and would be requested pretty frequently, but is fairly >> long-lived (~24 hr TTL). So, caching is a requirement to avoid negatively >> impacting the other system. I suppose we could implement caching between >> the API and Flink, but was hoping to avoid something like that. >> >> Thanks! >> >> Nate >> >> >> >> >> -- Best, Lukasz Krawiec