Hi Gen,

Yeah but AFAIK keyed state is accessible as long as you are working with a
keyed state, so these two options:

```
stream.keyBy(...).process(new MyCoProcessFunction());`
stream.keyBy(...).process(new MyKeyedCoProcessFunction());`
```

will just work well with keyed state & timers if I'm not mistaken since in
both cases you are on a keyed stream.

Salva



On Fri, Dec 2, 2022 at 3:27 AM Gen Luo <luogen...@gmail.com> wrote:

> Hi Salva,
>
> I suppose what you are missing is that, the timers are stored in the keyed
> state, so you may only register timers when using KeyedCoProcessFunction.
> If you try to register a timer in the CoProcessFunction, you'll get an
> UnsupportedOperationException with the message "Setting timers is only
> supported on a keyed streams.".
>
> For the first question, as far as I know, there should be no difference
> that matters.
>
> Salva Alcántara <salcantara...@gmail.com> 于 2022年12月1日周四 18:27写道:
>
>> The current docs claim [1]:
>>
>> "KeyedProcessFunction, as an extension of ProcessFunction, gives access
>> to the key of timers in its onTimer(...) method."
>>
>> So, from what it's worth, it seems that if one does not need to query the
>> current key, wich within a `Keyed(Co)ProcessFunction` can be done like this
>>
>> ```
>> @Override
>> public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT>
>> out) throws Exception {
>>     K key = ctx.getCurrentKey();
>>     // ...
>> }
>> ```
>>
>> , they are mostly interchangeable (???). For instance, if I wanted to
>> replace an existing `RichCoFlatMapFunction<IN1, IN2, OUT>` that uses keyed
>> state with a process function, should I use `CoProcessFunction<IN1, IN2,
>> OUT>`  or `KeyedCoProcessFunction<K, IN1, IN2, OUT>`? The type parameters
>> for the `CoProcessFunction` exactly match those for the `RichCoFlatMap`.
>> The current docs also claim that `CoProcessFunction` can work with keyed
>> state [1]:
>>
>> "The ProcessFunction can be thought of as a FlatMapFunction with access
>> to keyed state and timers."
>>
>> Also:
>>
>> "If you want to access keyed state and timers you have to apply the
>> ProcessFunction on a keyed stream: `stream.keyBy(...).process(new
>> MyProcessFunction());`."
>>
>> so it seems that the natural replacement choice should be the
>> `CoProcessFunction` unless I'm missing something that strictly requires the
>> usage of the keyed version (???). However, I've recently commented in [2]
>> and I got a reply saying that I should go with `KeyedCoProcessFunction` if
>> I'm using keyed state or timers which is a bit confusing to me. In summary:
>>
>> - Does replacing a `CoFlatMapFunction` with a `CoProcessFunction`
>> preserve the original behaviour in presence of keyed state?
>> - If timers are used later on, does it make any difference if I use
>> `CoProcessFunction` instead of `KeyedCoProcessFunction`? In my case, I
>> don't need to explicitly access the current key for anything
>> (`ctx.getCurrentKey()`).
>>
>> Thanks in advance!
>>
>> Salva
>>
>> ---
>>
>> References
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction
>> [2]
>> https://stackoverflow.com/questions/63708216/coprocessfunction-vs-keyedcoprocessfunction-on-connected-keyed-streams/63711885?noredirect=1#comment131747225_63711885
>>
>>

Reply via email to