Hi Salva, I suppose what you are missing is that, the timers are stored in the keyed state, so you may only register timers when using KeyedCoProcessFunction. If you try to register a timer in the CoProcessFunction, you'll get an UnsupportedOperationException with the message "Setting timers is only supported on a keyed streams.".
For the first question, as far as I know, there should be no difference that matters. Salva Alcántara <salcantara...@gmail.com> 于 2022年12月1日周四 18:27写道: > The current docs claim [1]: > > "KeyedProcessFunction, as an extension of ProcessFunction, gives access > to the key of timers in its onTimer(...) method." > > So, from what it's worth, it seems that if one does not need to query the > current key, wich within a `Keyed(Co)ProcessFunction` can be done like this > > ``` > @Override > public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> > out) throws Exception { > K key = ctx.getCurrentKey(); > // ... > } > ``` > > , they are mostly interchangeable (???). For instance, if I wanted to > replace an existing `RichCoFlatMapFunction<IN1, IN2, OUT>` that uses keyed > state with a process function, should I use `CoProcessFunction<IN1, IN2, > OUT>` or `KeyedCoProcessFunction<K, IN1, IN2, OUT>`? The type parameters > for the `CoProcessFunction` exactly match those for the `RichCoFlatMap`. > The current docs also claim that `CoProcessFunction` can work with keyed > state [1]: > > "The ProcessFunction can be thought of as a FlatMapFunction with access to > keyed state and timers." > > Also: > > "If you want to access keyed state and timers you have to apply the > ProcessFunction on a keyed stream: `stream.keyBy(...).process(new > MyProcessFunction());`." > > so it seems that the natural replacement choice should be the > `CoProcessFunction` unless I'm missing something that strictly requires the > usage of the keyed version (???). However, I've recently commented in [2] > and I got a reply saying that I should go with `KeyedCoProcessFunction` if > I'm using keyed state or timers which is a bit confusing to me. In summary: > > - Does replacing a `CoFlatMapFunction` with a `CoProcessFunction` preserve > the original behaviour in presence of keyed state? > - If timers are used later on, does it make any difference if I use > `CoProcessFunction` instead of `KeyedCoProcessFunction`? In my case, I > don't need to explicitly access the current key for anything > (`ctx.getCurrentKey()`). > > Thanks in advance! > > Salva > > --- > > References > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction > [2] > https://stackoverflow.com/questions/63708216/coprocessfunction-vs-keyedcoprocessfunction-on-connected-keyed-streams/63711885?noredirect=1#comment131747225_63711885 > >