Hi Vamsi,

In short, the `KeyedProcessFunction` will process the next record of the
same key before getting the async response. It only controls the
synchronous part of `processElement` for each record to be executed in
serial. For now we have similar functionality like keeping same-key record
processing strictly in order for async state access, but not for customized
logic. We may have that in future.

And FLIP-519[1] will introduce ordered record processing in a non-keyed
context. You may check whether it meets your needs.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode


Best,
Zakelly


On Sun, May 18, 2025 at 4:00 AM Vamsi Aws <vamsiaw...@gmail.com> wrote:

> Hi Team,
>
> Could you please answer the below:
>
> When we are asynchronously calling a cassandra DB in Keyed ProcessStream
> operator and before receiving response from DB , will the operator process
> next event of same key ?
>
> As per below code we understood, it will process the next event of same
> key, is it accurate understanding or we are doing it wrong ?
>
> Thank you for answering.
> ```
>
> @Slf4j
> @Builder
> public class SCKeyedProcessFunction
>     extends KeyedProcessFunction<UUID, EventWithAssociations,
> EventWithAssociations> {
>
>   private OutputTag<EventWithAssociations> existingEvents;
>
>   private transient ValueState<SCCache> cachestate;
>
>   @Override
>   public void processElement(
>       EventWithAssociations eventWithAssociations,
>       KeyedProcessFunction<UUID, EventWithAssociations,
> EventWithAssociations>.Context
>           context,
>       Collector<EventWithAssociations> collector)
>       throws Exception {
>     log.info(
>         "Processing event in SCKeyedProcessFunction : {}",
>         eventWithAssociations.getTsvSefsEvent().getPayloadTxt());
>     // Check cache
>     try {
>       SCCache scCache = cachestate.value();
>
>       if (scCache == null) {
>         // Cache miss: Query Cassandra asynchronously
>         log.info(
>             "scCache is null for System_Id : {}",
>             EventWithAssociations.getEvent().getSystemId());
>         CompletableFuture<Cache> future =
> queryCassandraAsync(eventWithAssociations);
>
>         future.thenAccept(
>             scEvent -> {
>               try {
>                 if (scEvent != null) {
>                   log.info("scEvent is received from DB : {}", scEvent);
>                   // Update cache and process the event if necessary
>                   cachestate.update(scEvent);
>                   processEventWithCache(eventWithAssociations,
> scEvent, collector, context);
>                 } else {
>                   log.info("This is first event hence collecting it :
> {}", scEvent);
>                   // If no result from Cassandra, collect the event
>                   collector.collect(eventWithAssociations);
>                 }
>               } catch (Exception e) {
>                 throw new RuntimeException("Error updating cache or
> processing event", e);
>               }
>             });
>       } else {
>         // Cache hit: Process event with cached value
>         processEventWithCache(eventWithAssociations, scCache,
> collector, context);
>       }
>     } catch (Exception e) {
>       log.error("Error in SCKeyedProcessFunction asyncInvoke: {}",
> e.getMessage(), e);
>       throw new RuntimeException(e);
>     }
>   }
>

Reply via email to