Hi Salva,

I suppose what you are missing is that, the timers are stored in the keyed
state, so you may only register timers when using KeyedCoProcessFunction.
If you try to register a timer in the CoProcessFunction, you'll get an
UnsupportedOperationException with the message "Setting timers is only
supported on a keyed streams.".

For the first question, as far as I know, there should be no difference
that matters.

Salva Alcántara <salcantara...@gmail.com> 于 2022年12月1日周四 18:27写道:

> The current docs claim [1]:
>
> "KeyedProcessFunction, as an extension of ProcessFunction, gives access
> to the key of timers in its onTimer(...) method."
>
> So, from what it's worth, it seems that if one does not need to query the
> current key, wich within a `Keyed(Co)ProcessFunction` can be done like this
>
> ```
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT>
> out) throws Exception {
>     K key = ctx.getCurrentKey();
>     // ...
> }
> ```
>
> , they are mostly interchangeable (???). For instance, if I wanted to
> replace an existing `RichCoFlatMapFunction<IN1, IN2, OUT>` that uses keyed
> state with a process function, should I use `CoProcessFunction<IN1, IN2,
> OUT>`  or `KeyedCoProcessFunction<K, IN1, IN2, OUT>`? The type parameters
> for the `CoProcessFunction` exactly match those for the `RichCoFlatMap`.
> The current docs also claim that `CoProcessFunction` can work with keyed
> state [1]:
>
> "The ProcessFunction can be thought of as a FlatMapFunction with access to
> keyed state and timers."
>
> Also:
>
> "If you want to access keyed state and timers you have to apply the
> ProcessFunction on a keyed stream: `stream.keyBy(...).process(new
> MyProcessFunction());`."
>
> so it seems that the natural replacement choice should be the
> `CoProcessFunction` unless I'm missing something that strictly requires the
> usage of the keyed version (???). However, I've recently commented in [2]
> and I got a reply saying that I should go with `KeyedCoProcessFunction` if
> I'm using keyed state or timers which is a bit confusing to me. In summary:
>
> - Does replacing a `CoFlatMapFunction` with a `CoProcessFunction` preserve
> the original behaviour in presence of keyed state?
> - If timers are used later on, does it make any difference if I use
> `CoProcessFunction` instead of `KeyedCoProcessFunction`? In my case, I
> don't need to explicitly access the current key for anything
> (`ctx.getCurrentKey()`).
>
> Thanks in advance!
>
> Salva
>
> ---
>
> References
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction
> [2]
> https://stackoverflow.com/questions/63708216/coprocessfunction-vs-keyedcoprocessfunction-on-connected-keyed-streams/63711885?noredirect=1#comment131747225_63711885
>
>

Reply via email to