Hi Vamsi, In short, the `KeyedProcessFunction` will process the next record of the same key before getting the async response. It only controls the synchronous part of `processElement` for each record to be executed in serial. For now we have similar functionality like keeping same-key record processing strictly in order for async state access, but not for customized logic. We may have that in future.
And FLIP-519[1] will introduce ordered record processing in a non-keyed context. You may check whether it meets your needs. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode Best, Zakelly On Sun, May 18, 2025 at 4:00 AM Vamsi Aws <vamsiaw...@gmail.com> wrote: > Hi Team, > > Could you please answer the below: > > When we are asynchronously calling a cassandra DB in Keyed ProcessStream > operator and before receiving response from DB , will the operator process > next event of same key ? > > As per below code we understood, it will process the next event of same > key, is it accurate understanding or we are doing it wrong ? > > Thank you for answering. > ``` > > @Slf4j > @Builder > public class SCKeyedProcessFunction > extends KeyedProcessFunction<UUID, EventWithAssociations, > EventWithAssociations> { > > private OutputTag<EventWithAssociations> existingEvents; > > private transient ValueState<SCCache> cachestate; > > @Override > public void processElement( > EventWithAssociations eventWithAssociations, > KeyedProcessFunction<UUID, EventWithAssociations, > EventWithAssociations>.Context > context, > Collector<EventWithAssociations> collector) > throws Exception { > log.info( > "Processing event in SCKeyedProcessFunction : {}", > eventWithAssociations.getTsvSefsEvent().getPayloadTxt()); > // Check cache > try { > SCCache scCache = cachestate.value(); > > if (scCache == null) { > // Cache miss: Query Cassandra asynchronously > log.info( > "scCache is null for System_Id : {}", > EventWithAssociations.getEvent().getSystemId()); > CompletableFuture<Cache> future = > queryCassandraAsync(eventWithAssociations); > > future.thenAccept( > scEvent -> { > try { > if (scEvent != null) { > log.info("scEvent is received from DB : {}", scEvent); > // Update cache and process the event if necessary > cachestate.update(scEvent); > processEventWithCache(eventWithAssociations, > scEvent, collector, context); > } else { > log.info("This is first event hence collecting it : > {}", scEvent); > // If no result from Cassandra, collect the event > collector.collect(eventWithAssociations); > } > } catch (Exception e) { > throw new RuntimeException("Error updating cache or > processing event", e); > } > }); > } else { > // Cache hit: Process event with cached value > processEventWithCache(eventWithAssociations, scCache, > collector, context); > } > } catch (Exception e) { > log.error("Error in SCKeyedProcessFunction asyncInvoke: {}", > e.getMessage(), e); > throw new RuntimeException(e); > } > } >