Yeah I took a look too...I guess that even if both work the clearer (winner) option would be the Keyed(Co)ProcessFunction.
Anyway, thanks a lot! Salva On Fri, Dec 2, 2022, 12:23 Gen Luo <luogen...@gmail.com> wrote: > Hi Salva, > > I mistook the idea, and yes both will work well. > `stream.keyBy(...).process(new MyCoProcessFunction());` uses > LegacyKeyedCoProcessOperator, and stream.keyBy(...).process(new > MyKeyedCoProcessFunction());` uses KeyedCoProcessOperator. Comparing the > two operators you'll find they are almost the same except for the > `getCurrentKey` method. > > Salva Alcántara <salcantara...@gmail.com> 于 2022年12月2日周五 16:38写道: > >> Hi Gen, >> >> Yeah but AFAIK keyed state is accessible as long as you are working with >> a keyed state, so these two options: >> >> ``` >> stream.keyBy(...).process(new MyCoProcessFunction());` >> stream.keyBy(...).process(new MyKeyedCoProcessFunction());` >> ``` >> >> will just work well with keyed state & timers if I'm not mistaken since >> in both cases you are on a keyed stream. >> >> Salva >> >> >> >> On Fri, Dec 2, 2022 at 3:27 AM Gen Luo <luogen...@gmail.com> wrote: >> >>> Hi Salva, >>> >>> I suppose what you are missing is that, the timers are stored in the >>> keyed state, so you may only register timers when using >>> KeyedCoProcessFunction. If you try to register a timer in the >>> CoProcessFunction, you'll get an UnsupportedOperationException with the >>> message "Setting timers is only supported on a keyed streams.". >>> >>> For the first question, as far as I know, there should be no difference >>> that matters. >>> >>> Salva Alcántara <salcantara...@gmail.com> 于 2022年12月1日周四 18:27写道: >>> >>>> The current docs claim [1]: >>>> >>>> "KeyedProcessFunction, as an extension of ProcessFunction, gives >>>> access to the key of timers in its onTimer(...) method." >>>> >>>> So, from what it's worth, it seems that if one does not need to query >>>> the current key, wich within a `Keyed(Co)ProcessFunction` can be done like >>>> this >>>> >>>> ``` >>>> @Override >>>> public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> >>>> out) throws Exception { >>>> K key = ctx.getCurrentKey(); >>>> // ... >>>> } >>>> ``` >>>> >>>> , they are mostly interchangeable (???). For instance, if I wanted to >>>> replace an existing `RichCoFlatMapFunction<IN1, IN2, OUT>` that uses keyed >>>> state with a process function, should I use `CoProcessFunction<IN1, IN2, >>>> OUT>` or `KeyedCoProcessFunction<K, IN1, IN2, OUT>`? The type parameters >>>> for the `CoProcessFunction` exactly match those for the `RichCoFlatMap`. >>>> The current docs also claim that `CoProcessFunction` can work with keyed >>>> state [1]: >>>> >>>> "The ProcessFunction can be thought of as a FlatMapFunction with access >>>> to keyed state and timers." >>>> >>>> Also: >>>> >>>> "If you want to access keyed state and timers you have to apply the >>>> ProcessFunction on a keyed stream: `stream.keyBy(...).process(new >>>> MyProcessFunction());`." >>>> >>>> so it seems that the natural replacement choice should be the >>>> `CoProcessFunction` unless I'm missing something that strictly requires the >>>> usage of the keyed version (???). However, I've recently commented in [2] >>>> and I got a reply saying that I should go with `KeyedCoProcessFunction` if >>>> I'm using keyed state or timers which is a bit confusing to me. In summary: >>>> >>>> - Does replacing a `CoFlatMapFunction` with a `CoProcessFunction` >>>> preserve the original behaviour in presence of keyed state? >>>> - If timers are used later on, does it make any difference if I use >>>> `CoProcessFunction` instead of `KeyedCoProcessFunction`? In my case, I >>>> don't need to explicitly access the current key for anything >>>> (`ctx.getCurrentKey()`). >>>> >>>> Thanks in advance! >>>> >>>> Salva >>>> >>>> --- >>>> >>>> References >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction >>>> [2] >>>> https://stackoverflow.com/questions/63708216/coprocessfunction-vs-keyedcoprocessfunction-on-connected-keyed-streams/63711885?noredirect=1#comment131747225_63711885 >>>> >>>>