Hi John, This seems like a bug and I have created a ticket https://issues.apache.org/jira/browse/FLINK-28253 to track it.
For now, you could try replacing to_data_stream with to_append_stream` to see if it works. Regards, Dian On Sat, Jun 25, 2022 at 4:07 AM John Tipper <john_tip...@hotmail.com> wrote: > Hi, > > I have a source table using a Kinesis connector reading events from AWS > EventBridge using PyFlink 1.15.0. An example of the sorts of data that are > in this stream is here: > https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref. > Note that the stream of data contains many different types of events, where > the 'detail' field is completely different between different event types. > There is no support for this connector using PyFlink DataStream API, so I > use the Table API to construct the source table. The table looks like this: > > > CREATE TABLE events ( > `id` VARCHAR, > `source` VARCHAR, > `account` VARCHAR, > `region` VARCHAR, > `detail-type` VARCHAR, > `detail` VARCHAR, > `source` VARCHAR, > `resources` VARCHAR, > `time` TIMESTAMP(0) WITH LOCAL TIME ZONE, > WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > ... > ) > > > > The table was created using: > > table_env.execute_sql(CREATE_STRING_ABOVE) > > I'd like to turn this table into a data stream so I can perform some > processing that is easier to do in the DataStream API: > > > events_stream_table = table_env.from_path('events') > > events_stream = table_env.to_data_stream(events_stream_table) > > # now do some processing - let's filter by the type of event we get > > codebuild_stream = events_stream.filter( > lambda event: event['source'] == 'aws.codebuild' > ) > > # now do other stuff on a stream containing only events that are identical > in shape > ... > # maybe convert back into a Table and perform SQL on the data > > > When I run this, I get an exception: > > > org.apache.flink.table.api.TableException: Unsupported conversion from data > type > > 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to > > type information. Only data types that originated from type information fully > > support a reverse conversion. > > > Somebody reported a similar error here ( > https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception) > When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL > TIME ZONE" with a "TIMESTAMP(3)" I get a different exception: > > TypeError: The java type info: LocalDateTime is not supported in PyFlink > currently. > > > Is there a way of converting this Table into a DataStream (and then back > again)? I need to use the data in the "time" field as the source of > watermarks for my events. > > Many thanks, > > John >