Hi, I have a source table using a Kinesis connector reading events from AWS EventBridge using PyFlink 1.15.0. An example of the sorts of data that are in this stream is here: https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref. Note that the stream of data contains many different types of events, where the 'detail' field is completely different between different event types. There is no support for this connector using PyFlink DataStream API, so I use the Table API to construct the source table. The table looks like this:
CREATE TABLE events ( `id` VARCHAR, `source` VARCHAR, `account` VARCHAR, `region` VARCHAR, `detail-type` VARCHAR, `detail` VARCHAR, `source` VARCHAR, `resources` VARCHAR, `time` TIMESTAMP(0) WITH LOCAL TIME ZONE, WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( ... ) The table was created using: table_env.execute_sql(CREATE_STRING_ABOVE) I'd like to turn this table into a data stream so I can perform some processing that is easier to do in the DataStream API: events_stream_table = table_env.from_path('events') events_stream = table_env.to_data_stream(events_stream_table) # now do some processing - let's filter by the type of event we get codebuild_stream = events_stream.filter( lambda event: event['source'] == 'aws.codebuild' ) # now do other stuff on a stream containing only events that are identical in shape ... # maybe convert back into a Table and perform SQL on the data When I run this, I get an exception: org.apache.flink.table.api.TableException: Unsupported conversion from data type 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to type information. Only data types that originated from type information fully support a reverse conversion. Somebody reported a similar error here (https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception) When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL TIME ZONE" with a "TIMESTAMP(3)" I get a different exception: TypeError: The java type info: LocalDateTime is not supported in PyFlink currently. Is there a way of converting this Table into a DataStream (and then back again)? I need to use the data in the "time" field as the source of watermarks for my events. Many thanks, John