Actually the *output_type_info* is ok, it was copy/paste typo. I changed the function to:
class MyProcessFunction(KeyedProcessFunction): def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], timestamp=ctx.timestamp()) ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500) def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'): yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, timestamp=timestamp) And the type to: output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 'timestamp'], [Types.STRING(), Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()]) I cant return the same data in *on_timer()* because there is no value parameter. Thats why i hardcoded *device_ts*. However the exception persists. I am not sure if the timestamp field should be included in *output_type_info* as i did now. Regards On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <acqua....@gmail.com> wrote: > Hi meneldor, > > Actually, the return type of the on_timer() must be the same as > process_element(). It seems that the yield value of process_element() is > missing the `timestamp` field. And the `output_type_info` has four field > names but with 5 field types. Could you align them? > > Best, > Shuiqiang >