Hi Shuiqiang, meneldor, 1. In fact, there is a problem with using Python `Named Row` as the return value of user-defined function in PyFlink.
When serializing a Row data, the serializer of each field is consistent with the order of the Row fields. But the field order of Python `Named Row` has been sorted by field, and it was designed to better compare Named Row and calculate hash values. So this can lead to serialization/deserialization errors(The correspondence between serializer and field is wrong). It is for performance considerations that serializers are not specified according to file name, but `Named Row` support can be achieved at the expense of a little performance for ease of use. For the current example, I suggest returning a list or a normal Row, instead of a Named Row. 2. In pyflink 1.12.0, the method signature of `on_timer` should be `def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector)`[1]. If you want to send data in `on_timer`, you can use `Collector.collect`. e.g. def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector): out.collect(Row('a', 'b', 'c')) 3. >>> I am not sure if the timestamp field should be included in output_type_info as i did now. If you return data with a time_stamp field, `output_type_info` needs to have `time_stamp` field. For example, the data returned in your example contains `time_stamp`, so your `output_type_info` needs to have the information of this field. [1] https://github.com/apache/flink/blob/release-1.12.0/flink-python/pyflink/datastream/functions.py#L759 Best, Xingbo > 2021年1月18日 下午9:21,meneldor <menel...@gmail.com> 写道: > > Actually the output_type_info is ok, it was copy/paste typo. I changed the > function to: > class MyProcessFunction(KeyedProcessFunction): > def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): > yield types.Row(id=ctx.get_current_key()[0], > tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], > timestamp=ctx.timestamp()) > ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500) > > def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'): > yield types.Row(id=ctx.get_current_key()[0], > tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, > timestamp=timestamp) > And the type to: > output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', > 'timestamp'], > [Types.STRING(), Types.STRING(), > Types.STRING(), Types.LONG(), Types.LONG()]) > I cant return the same data in on_timer() because there is no value > parameter. Thats why i hardcoded device_ts. However the exception persists. > I am not sure if the timestamp field should be included in output_type_info > as i did now. > Regards > > On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <acqua....@gmail.com > <mailto:acqua....@gmail.com>> wrote: > Hi meneldor, > > Actually, the return type of the on_timer() must be the same as > process_element(). It seems that the yield value of process_element() is > missing the `timestamp` field. And the `output_type_info` has four field > names but with 5 field types. Could you align them? > > Best, > Shuiqiang