Actually the *output_type_info* is ok, it was copy/paste typo. I changed
the function to:
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield types.Row(id=ctx.get_current_key()[0],
tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2],
timestamp=ctx.timestamp())
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)
def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield types.Row(id=ctx.get_current_key()[0],
tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111,
timestamp=timestamp)
And the type to:
output_type_info = Types.ROW_NAMED(['id', 'tp', 'account',
'device_ts', 'timestamp'],
[Types.STRING(), Types.STRING(),
Types.STRING(), Types.LONG(), Types.LONG()])
I cant return the same data in *on_timer()* because there is no value
parameter. Thats why i hardcoded *device_ts*. However the exception
persists.
I am not sure if the timestamp field should be included in
*output_type_info* as i did now.
Regards
On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <[email protected]> wrote:
> Hi meneldor,
>
> Actually, the return type of the on_timer() must be the same as
> process_element(). It seems that the yield value of process_element() is
> missing the `timestamp` field. And the `output_type_info` has four field
> names but with 5 field types. Could you align them?
>
> Best,
> Shuiqiang
>