I am trying to sink the rowtime field in pyflink 1.10. I get the following error

For the source schema I use

    .field("rowtime", DataTypes.TIMESTAMP(2))
        .rowtime(
            Rowtime()
            .timestamps_from_field("timestamp")
            .watermarks_periodic_ascending()
        )

To create the rowtime field and have tried variations on

    .field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())

In the sink schema.

Trying all of the different types in DataTypes I get essentially the following 
error:

py4j.protocol.Py4JJavaError: An error occurred while calling o56.insertInto.
: org.apache.flink.table.api.ValidationException: Field types of query result 
and registered TableSink `default_catalog`.`default_database`.`output` do not 
match.
Query result schema: [rowtime: LocalDateTime]
TableSink schema:    [rowtime: Timestamp]


I know that in Java there is 
org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python 
documentation lists Types.SQL_TIMESTAMP, but I cannot find the corresponding 
type in the python library. Can anyone help point me to the correct type for 
the schema?

Thanks,
Jesse





Reply via email to