Yeah I took a look too...I guess that even if both work the clearer
(winner) option would be the Keyed(Co)ProcessFunction.

Anyway, thanks a lot!

Salva

On Fri, Dec 2, 2022, 12:23 Gen Luo <luogen...@gmail.com> wrote:

> Hi Salva,
>
> I mistook the idea, and yes both will work well.
> `stream.keyBy(...).process(new MyCoProcessFunction());` uses
> LegacyKeyedCoProcessOperator,  and stream.keyBy(...).process(new
> MyKeyedCoProcessFunction());` uses KeyedCoProcessOperator. Comparing the
> two operators you'll find they are almost the same except for the
> `getCurrentKey` method.
>
> Salva Alcántara <salcantara...@gmail.com> 于 2022年12月2日周五 16:38写道:
>
>> Hi Gen,
>>
>> Yeah but AFAIK keyed state is accessible as long as you are working with
>> a keyed state, so these two options:
>>
>> ```
>> stream.keyBy(...).process(new MyCoProcessFunction());`
>> stream.keyBy(...).process(new MyKeyedCoProcessFunction());`
>> ```
>>
>> will just work well with keyed state & timers if I'm not mistaken since
>> in both cases you are on a keyed stream.
>>
>> Salva
>>
>>
>>
>> On Fri, Dec 2, 2022 at 3:27 AM Gen Luo <luogen...@gmail.com> wrote:
>>
>>> Hi Salva,
>>>
>>> I suppose what you are missing is that, the timers are stored in the
>>> keyed state, so you may only register timers when using
>>> KeyedCoProcessFunction. If you try to register a timer in the
>>> CoProcessFunction, you'll get an UnsupportedOperationException with the
>>> message "Setting timers is only supported on a keyed streams.".
>>>
>>> For the first question, as far as I know, there should be no difference
>>> that matters.
>>>
>>> Salva Alcántara <salcantara...@gmail.com> 于 2022年12月1日周四 18:27写道:
>>>
>>>> The current docs claim [1]:
>>>>
>>>> "KeyedProcessFunction, as an extension of ProcessFunction, gives
>>>> access to the key of timers in its onTimer(...) method."
>>>>
>>>> So, from what it's worth, it seems that if one does not need to query
>>>> the current key, wich within a `Keyed(Co)ProcessFunction` can be done like
>>>> this
>>>>
>>>> ```
>>>> @Override
>>>> public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT>
>>>> out) throws Exception {
>>>>     K key = ctx.getCurrentKey();
>>>>     // ...
>>>> }
>>>> ```
>>>>
>>>> , they are mostly interchangeable (???). For instance, if I wanted to
>>>> replace an existing `RichCoFlatMapFunction<IN1, IN2, OUT>` that uses keyed
>>>> state with a process function, should I use `CoProcessFunction<IN1, IN2,
>>>> OUT>`  or `KeyedCoProcessFunction<K, IN1, IN2, OUT>`? The type parameters
>>>> for the `CoProcessFunction` exactly match those for the `RichCoFlatMap`.
>>>> The current docs also claim that `CoProcessFunction` can work with keyed
>>>> state [1]:
>>>>
>>>> "The ProcessFunction can be thought of as a FlatMapFunction with access
>>>> to keyed state and timers."
>>>>
>>>> Also:
>>>>
>>>> "If you want to access keyed state and timers you have to apply the
>>>> ProcessFunction on a keyed stream: `stream.keyBy(...).process(new
>>>> MyProcessFunction());`."
>>>>
>>>> so it seems that the natural replacement choice should be the
>>>> `CoProcessFunction` unless I'm missing something that strictly requires the
>>>> usage of the keyed version (???). However, I've recently commented in [2]
>>>> and I got a reply saying that I should go with `KeyedCoProcessFunction` if
>>>> I'm using keyed state or timers which is a bit confusing to me. In summary:
>>>>
>>>> - Does replacing a `CoFlatMapFunction` with a `CoProcessFunction`
>>>> preserve the original behaviour in presence of keyed state?
>>>> - If timers are used later on, does it make any difference if I use
>>>> `CoProcessFunction` instead of `KeyedCoProcessFunction`? In my case, I
>>>> don't need to explicitly access the current key for anything
>>>> (`ctx.getCurrentKey()`).
>>>>
>>>> Thanks in advance!
>>>>
>>>> Salva
>>>>
>>>> ---
>>>>
>>>> References
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction
>>>> [2]
>>>> https://stackoverflow.com/questions/63708216/coprocessfunction-vs-keyedcoprocessfunction-on-connected-keyed-streams/63711885?noredirect=1#comment131747225_63711885
>>>>
>>>>

Reply via email to