Actually the *output_type_info* is ok, it was copy/paste typo. I changed
the function to:

class MyProcessFunction(KeyedProcessFunction):
    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        yield types.Row(id=ctx.get_current_key()[0],
tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2],
timestamp=ctx.timestamp())
        ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

    def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
        yield types.Row(id=ctx.get_current_key()[0],
tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111,
timestamp=timestamp)

And the type to:

output_type_info = Types.ROW_NAMED(['id', 'tp', 'account',
'device_ts', 'timestamp'],
                                     [Types.STRING(), Types.STRING(),
Types.STRING(), Types.LONG(), Types.LONG()])

I cant return the same data in *on_timer()* because there is no value
parameter. Thats why i hardcoded *device_ts*. However the exception
persists.

I am not sure if the timestamp field should be included in
*output_type_info* as i did now.

Regards


On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <acqua....@gmail.com> wrote:

> Hi meneldor,
>
> Actually, the return type of the on_timer() must be the same as
> process_element(). It seems that the yield value of process_element() is
> missing the `timestamp` field.  And the `output_type_info` has four field
> names but with 5 field types. Could you align them?
>
> Best,
> Shuiqiang
>

Reply via email to