Hello Nicolaus, Thank you for your quick feedback, sorry if I am not clear enough. Actually in the documented example, the state which is updated in the snapshotState method is an operator state and not a keyed state:
*public void initializeState(FunctionInitializationContext context) throws Exception {* * [...]* * countPerPartition = context.getOperatorStateStore().getOperatorState(new ListStateDescriptor<>("perPartitionCount", Long.class));* * [...] } public void snapshotState(FunctionSnapshotContext context) throws Exception {* * [...]* * countPerPartition.add(localCount);* *}* It seems that the method is then only called once per operator parallel task and not once per key. On my side I have two keyed states with same key (e.g., userId) in a CoFlatMapFunction: *// Control state partitioned by userId private ValueState<Control> controlState; // Data state partitioned by userId coming from the ser/deserialization of a custom system having a partitioned state private ValueState<byte[]> dataState;* and I would like to do something like that to update dataState in a keyed context for every key and every checkpoint: *public void snapshotState(FunctionSnapshotContext context) throws Exception { dataState.update(customSystem.getSnapshot(context.getKey()); // Not a keyed context here ! }* instead of saving dataState in the flatMap2 function for every received event: *public void flatMap1(Control control, Collector<Control> out) {* * controlState.update(control); * *}* *public void flatMap2(Event event, Collector<ProcessedEvent> out) { // Perform some event transformations based on controlState ProcessedEvent result = customSystem.process(controlState.value() , event); // Save internal custom system state after processing: can be costly if high event throughput dataState.update(customSystem.getSnapshot(controlState.value().getUserId()); // Output the processed event out.collect(result); }* So basically, I want to be able to synchronize the partitioned state of my custom system with the checkpoints done by Flink. Best Regards, Marc Le mer. 6 oct. 2021 à 12:11, Nicolaus Weidner < nicolaus.weid...@ververica.com> a écrit : > Hi Marc, > > I think you can just use keyed state in a > CheckpointedFunction. FunctionInitializationContext gives you access to > both keyed state and operator state (your stream needs to be keyed, of > course). So you could just update your local custom state on regular > invocations and update keyed state on calls to snapshotState. > Check out the example in [1] where both types of state are used. > > Does that help? Not sure if I understood the problem correctly. > > Best regards, > Nico > > [1] > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L74-L110 > > On Tue, Oct 5, 2021 at 3:28 PM Marc LEGER <maleger...@gmail.com> wrote: > >> Hello, >> >> Is there any method available in a RichFunction to be called by Flink >> with a keyed context each time a checkpoint is triggered please ? >> >> It seems that the CheckpointedFunction interface provides such a feature >> (snapshotState method) but only in case of operator state and it is called >> in a non-keyed context. >> >> Indeed, I am implementing a CoFlatMapFunction with: >> - a keyed state (state1) for a "control" stream (stream1) which is not >> often updated, >> - a keyed state (state2) for a "data" stream (stream2) with a high >> throughput and relying on a custom solution for internal state snapshot >> with some potential performance impact. >> >> Consequently, I don't want to trigger a state2 update for every event >> received in stream2 for efficiency reasons but rather update state2 based >> on checkpoints triggered by Flink. >> >> Best Regards, >> Marc >> >>