Hi Shuiqiang, meneldor,

1. In fact, there is a problem with using Python `Named Row` as the return 
value of user-defined function in PyFlink.

        When serializing a Row data, the serializer of each field is consistent 
with the order of the Row fields. But the field order of Python `Named Row` has 
been sorted by field, and it was designed to better compare Named Row and 
calculate hash values. So this can lead to serialization/deserialization 
errors(The correspondence between serializer and field is wrong). It is for 
performance considerations that serializers are not specified according to file 
name, but `Named Row` support can be achieved at the expense of a little 
performance for ease of use. For the current example, I suggest returning a 
list or a normal Row, instead of a Named Row.


2. In pyflink 1.12.0, the method signature of `on_timer` should be `def 
process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: 
Collector)`[1].  If you want to send data in `on_timer`, you can use 
`Collector.collect`. e.g.
        
        def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
out: Collector):
                out.collect(Row('a', 'b', 'c'))

3. >>> I am not sure if the timestamp field should be included in 
output_type_info as i did now.

        If you return data with a time_stamp field, `output_type_info` needs to 
have `time_stamp` field. For example, the data returned in your example 
contains `time_stamp`, so your `output_type_info` needs to have the information 
of this field.


[1] 
https://github.com/apache/flink/blob/release-1.12.0/flink-python/pyflink/datastream/functions.py#L759

Best,
Xingbo

> 2021年1月18日 下午9:21,meneldor <menel...@gmail.com> 写道:
> 
> Actually the output_type_info is ok, it was copy/paste typo. I changed the 
> function to:
> class MyProcessFunction(KeyedProcessFunction):
>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>         yield types.Row(id=ctx.get_current_key()[0], 
> tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], 
> timestamp=ctx.timestamp())
>         ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)
> 
>     def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
>         yield types.Row(id=ctx.get_current_key()[0], 
> tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, 
> timestamp=timestamp)
> And the type to:
> output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 
> 'timestamp'],
>                                      [Types.STRING(), Types.STRING(), 
> Types.STRING(), Types.LONG(), Types.LONG()])
> I cant return the same data in on_timer() because there is no value 
> parameter. Thats why i hardcoded device_ts. However the exception persists. 
> I am not sure if the timestamp field should be included in output_type_info 
> as i did now.
> Regards
> 
> On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <acqua....@gmail.com 
> <mailto:acqua....@gmail.com>> wrote:
> Hi meneldor, 
> 
> Actually, the return type of the on_timer() must be the same as 
> process_element(). It seems that the yield value of process_element() is 
> missing the `timestamp` field.  And the `output_type_info` has four field 
> names but with 5 field types. Could you align them?
> 
> Best,
> Shuiqiang

Reply via email to