Hi,

At one point I explored using OperatorState and implementing
CheckpointedFunction interface in the class that extends RichAsyncFunction.
It seemed to work on my local machine, but I never tried it in production.

On Tue, Jun 24, 2025 at 2:28 AM Jean-Marc Paulin <jm.pau...@gmail.com>
wrote:

> Hi Nate,
>
> We delegate the enrichment to another microservice over kafka, and process
> the kafka response when we receive it. We use a KeyedCoProcessFunction to
> post the kafka request using a side output in the processElement1 handler,
> and we collect the response in the processElement2 handler. We keep a
> request/response id in the Keyed state.
>
> This is far from perfect, introduces extra latency, but in our case the
> enrichment queries can take a few seconds. We still need to handle flink
> restarts and the fact we may end-up processing the same message twice if we
> restart from a previous savepoint.
>
> Just an idea. We already use kafka extensively so that was not a major
> challenge for us.
>
> Hope this helps
>
> JM
>
>
> On Mon, Jun 23, 2025 at 8:47 PM Nate Drake <ndr...@gmail.com> wrote:
>
>> Hi,
>>
>> We're building a Flink job to process metric data from client devices. We
>> need to enrich these events via an external HTTP API.  We were thinking
>> we'd use Flink state as a cache of this enrichment data to reduce the load
>> on the external service.  It seems AsyncFunctions do not support keyed
>> state at the moment, so that's out.
>>
>> I've found this AWS blog describing a possible solution:
>> https://aws.amazon.com/blogs/big-data/implement-apache-flink-real-time-data-enrichment-patterns/
>>
>>
>> In the post, they're making synchronous HTTP calls in a
>> KeyedProcessFunction and it seems to perform well.  I've read conflicting
>> information on this pattern though, in similar questions on Stack Overflow,
>> saying performing synchronous requests like this is bad as it can block
>> checkpointing, etc.
>>
>> Are there any recommended patterns to do something like this without
>> compromising Flink's fault tolerance?  Our enrichment data is somewhat
>> expensive to build and would be requested pretty frequently, but is fairly
>> long-lived (~24 hr TTL).  So, caching is a requirement to avoid negatively
>> impacting the other system. I suppose we could implement caching between
>> the API and Flink, but was hoping to avoid something like that.
>>
>> Thanks!
>>
>> Nate
>>
>>
>>
>>
>>

-- 
Best,
Lukasz Krawiec

Reply via email to