Hi Salva, I mistook the idea, and yes both will work well. `stream.keyBy(...).process(new MyCoProcessFunction());` uses LegacyKeyedCoProcessOperator, and stream.keyBy(...).process(new MyKeyedCoProcessFunction());` uses KeyedCoProcessOperator. Comparing the two operators you'll find they are almost the same except for the `getCurrentKey` method.
Salva Alcántara <salcantara...@gmail.com> 于 2022年12月2日周五 16:38写道: > Hi Gen, > > Yeah but AFAIK keyed state is accessible as long as you are working with a > keyed state, so these two options: > > ``` > stream.keyBy(...).process(new MyCoProcessFunction());` > stream.keyBy(...).process(new MyKeyedCoProcessFunction());` > ``` > > will just work well with keyed state & timers if I'm not mistaken since in > both cases you are on a keyed stream. > > Salva > > > > On Fri, Dec 2, 2022 at 3:27 AM Gen Luo <luogen...@gmail.com> wrote: > >> Hi Salva, >> >> I suppose what you are missing is that, the timers are stored in the >> keyed state, so you may only register timers when using >> KeyedCoProcessFunction. If you try to register a timer in the >> CoProcessFunction, you'll get an UnsupportedOperationException with the >> message "Setting timers is only supported on a keyed streams.". >> >> For the first question, as far as I know, there should be no difference >> that matters. >> >> Salva Alcántara <salcantara...@gmail.com> 于 2022年12月1日周四 18:27写道: >> >>> The current docs claim [1]: >>> >>> "KeyedProcessFunction, as an extension of ProcessFunction, gives access >>> to the key of timers in its onTimer(...) method." >>> >>> So, from what it's worth, it seems that if one does not need to query >>> the current key, wich within a `Keyed(Co)ProcessFunction` can be done like >>> this >>> >>> ``` >>> @Override >>> public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> >>> out) throws Exception { >>> K key = ctx.getCurrentKey(); >>> // ... >>> } >>> ``` >>> >>> , they are mostly interchangeable (???). For instance, if I wanted to >>> replace an existing `RichCoFlatMapFunction<IN1, IN2, OUT>` that uses keyed >>> state with a process function, should I use `CoProcessFunction<IN1, IN2, >>> OUT>` or `KeyedCoProcessFunction<K, IN1, IN2, OUT>`? The type parameters >>> for the `CoProcessFunction` exactly match those for the `RichCoFlatMap`. >>> The current docs also claim that `CoProcessFunction` can work with keyed >>> state [1]: >>> >>> "The ProcessFunction can be thought of as a FlatMapFunction with access >>> to keyed state and timers." >>> >>> Also: >>> >>> "If you want to access keyed state and timers you have to apply the >>> ProcessFunction on a keyed stream: `stream.keyBy(...).process(new >>> MyProcessFunction());`." >>> >>> so it seems that the natural replacement choice should be the >>> `CoProcessFunction` unless I'm missing something that strictly requires the >>> usage of the keyed version (???). However, I've recently commented in [2] >>> and I got a reply saying that I should go with `KeyedCoProcessFunction` if >>> I'm using keyed state or timers which is a bit confusing to me. In summary: >>> >>> - Does replacing a `CoFlatMapFunction` with a `CoProcessFunction` >>> preserve the original behaviour in presence of keyed state? >>> - If timers are used later on, does it make any difference if I use >>> `CoProcessFunction` instead of `KeyedCoProcessFunction`? In my case, I >>> don't need to explicitly access the current key for anything >>> (`ctx.getCurrentKey()`). >>> >>> Thanks in advance! >>> >>> Salva >>> >>> --- >>> >>> References >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction >>> [2] >>> https://stackoverflow.com/questions/63708216/coprocessfunction-vs-keyedcoprocessfunction-on-connected-keyed-streams/63711885?noredirect=1#comment131747225_63711885 >>> >>>