Hi Salva,

I mistook the idea, and yes both will work well.
`stream.keyBy(...).process(new MyCoProcessFunction());` uses
LegacyKeyedCoProcessOperator,  and stream.keyBy(...).process(new
MyKeyedCoProcessFunction());` uses KeyedCoProcessOperator. Comparing the
two operators you'll find they are almost the same except for the
`getCurrentKey` method.

Salva Alcántara <salcantara...@gmail.com> 于 2022年12月2日周五 16:38写道:

> Hi Gen,
>
> Yeah but AFAIK keyed state is accessible as long as you are working with a
> keyed state, so these two options:
>
> ```
> stream.keyBy(...).process(new MyCoProcessFunction());`
> stream.keyBy(...).process(new MyKeyedCoProcessFunction());`
> ```
>
> will just work well with keyed state & timers if I'm not mistaken since in
> both cases you are on a keyed stream.
>
> Salva
>
>
>
> On Fri, Dec 2, 2022 at 3:27 AM Gen Luo <luogen...@gmail.com> wrote:
>
>> Hi Salva,
>>
>> I suppose what you are missing is that, the timers are stored in the
>> keyed state, so you may only register timers when using
>> KeyedCoProcessFunction. If you try to register a timer in the
>> CoProcessFunction, you'll get an UnsupportedOperationException with the
>> message "Setting timers is only supported on a keyed streams.".
>>
>> For the first question, as far as I know, there should be no difference
>> that matters.
>>
>> Salva Alcántara <salcantara...@gmail.com> 于 2022年12月1日周四 18:27写道:
>>
>>> The current docs claim [1]:
>>>
>>> "KeyedProcessFunction, as an extension of ProcessFunction, gives access
>>> to the key of timers in its onTimer(...) method."
>>>
>>> So, from what it's worth, it seems that if one does not need to query
>>> the current key, wich within a `Keyed(Co)ProcessFunction` can be done like
>>> this
>>>
>>> ```
>>> @Override
>>> public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT>
>>> out) throws Exception {
>>>     K key = ctx.getCurrentKey();
>>>     // ...
>>> }
>>> ```
>>>
>>> , they are mostly interchangeable (???). For instance, if I wanted to
>>> replace an existing `RichCoFlatMapFunction<IN1, IN2, OUT>` that uses keyed
>>> state with a process function, should I use `CoProcessFunction<IN1, IN2,
>>> OUT>`  or `KeyedCoProcessFunction<K, IN1, IN2, OUT>`? The type parameters
>>> for the `CoProcessFunction` exactly match those for the `RichCoFlatMap`.
>>> The current docs also claim that `CoProcessFunction` can work with keyed
>>> state [1]:
>>>
>>> "The ProcessFunction can be thought of as a FlatMapFunction with access
>>> to keyed state and timers."
>>>
>>> Also:
>>>
>>> "If you want to access keyed state and timers you have to apply the
>>> ProcessFunction on a keyed stream: `stream.keyBy(...).process(new
>>> MyProcessFunction());`."
>>>
>>> so it seems that the natural replacement choice should be the
>>> `CoProcessFunction` unless I'm missing something that strictly requires the
>>> usage of the keyed version (???). However, I've recently commented in [2]
>>> and I got a reply saying that I should go with `KeyedCoProcessFunction` if
>>> I'm using keyed state or timers which is a bit confusing to me. In summary:
>>>
>>> - Does replacing a `CoFlatMapFunction` with a `CoProcessFunction`
>>> preserve the original behaviour in presence of keyed state?
>>> - If timers are used later on, does it make any difference if I use
>>> `CoProcessFunction` instead of `KeyedCoProcessFunction`? In my case, I
>>> don't need to explicitly access the current key for anything
>>> (`ctx.getCurrentKey()`).
>>>
>>> Thanks in advance!
>>>
>>> Salva
>>>
>>> ---
>>>
>>> References
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction
>>> [2]
>>> https://stackoverflow.com/questions/63708216/coprocessfunction-vs-keyedcoprocessfunction-on-connected-keyed-streams/63711885?noredirect=1#comment131747225_63711885
>>>
>>>

Reply via email to