Hi Gen, Yeah but AFAIK keyed state is accessible as long as you are working with a keyed state, so these two options:
``` stream.keyBy(...).process(new MyCoProcessFunction());` stream.keyBy(...).process(new MyKeyedCoProcessFunction());` ``` will just work well with keyed state & timers if I'm not mistaken since in both cases you are on a keyed stream. Salva On Fri, Dec 2, 2022 at 3:27 AM Gen Luo <luogen...@gmail.com> wrote: > Hi Salva, > > I suppose what you are missing is that, the timers are stored in the keyed > state, so you may only register timers when using KeyedCoProcessFunction. > If you try to register a timer in the CoProcessFunction, you'll get an > UnsupportedOperationException with the message "Setting timers is only > supported on a keyed streams.". > > For the first question, as far as I know, there should be no difference > that matters. > > Salva Alcántara <salcantara...@gmail.com> 于 2022年12月1日周四 18:27写道: > >> The current docs claim [1]: >> >> "KeyedProcessFunction, as an extension of ProcessFunction, gives access >> to the key of timers in its onTimer(...) method." >> >> So, from what it's worth, it seems that if one does not need to query the >> current key, wich within a `Keyed(Co)ProcessFunction` can be done like this >> >> ``` >> @Override >> public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> >> out) throws Exception { >> K key = ctx.getCurrentKey(); >> // ... >> } >> ``` >> >> , they are mostly interchangeable (???). For instance, if I wanted to >> replace an existing `RichCoFlatMapFunction<IN1, IN2, OUT>` that uses keyed >> state with a process function, should I use `CoProcessFunction<IN1, IN2, >> OUT>` or `KeyedCoProcessFunction<K, IN1, IN2, OUT>`? The type parameters >> for the `CoProcessFunction` exactly match those for the `RichCoFlatMap`. >> The current docs also claim that `CoProcessFunction` can work with keyed >> state [1]: >> >> "The ProcessFunction can be thought of as a FlatMapFunction with access >> to keyed state and timers." >> >> Also: >> >> "If you want to access keyed state and timers you have to apply the >> ProcessFunction on a keyed stream: `stream.keyBy(...).process(new >> MyProcessFunction());`." >> >> so it seems that the natural replacement choice should be the >> `CoProcessFunction` unless I'm missing something that strictly requires the >> usage of the keyed version (???). However, I've recently commented in [2] >> and I got a reply saying that I should go with `KeyedCoProcessFunction` if >> I'm using keyed state or timers which is a bit confusing to me. In summary: >> >> - Does replacing a `CoFlatMapFunction` with a `CoProcessFunction` >> preserve the original behaviour in presence of keyed state? >> - If timers are used later on, does it make any difference if I use >> `CoProcessFunction` instead of `KeyedCoProcessFunction`? In my case, I >> don't need to explicitly access the current key for anything >> (`ctx.getCurrentKey()`). >> >> Thanks in advance! >> >> Salva >> >> --- >> >> References >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction >> [2] >> https://stackoverflow.com/questions/63708216/coprocessfunction-vs-keyedcoprocessfunction-on-connected-keyed-streams/63711885?noredirect=1#comment131747225_63711885 >> >>