Hi John,

This seems like a bug and I have created a ticket
https://issues.apache.org/jira/browse/FLINK-28253 to track it.

For now, you could try replacing to_data_stream with to_append_stream` to
see if it works.

Regards,
Dian

On Sat, Jun 25, 2022 at 4:07 AM John Tipper <john_tip...@hotmail.com> wrote:

> Hi,
>
> I have a source table using a Kinesis connector reading events from AWS
> EventBridge using PyFlink 1.15.0. An example of the sorts of data that are
> in this stream is here:
> https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref.
> Note that the stream of data contains many different types of events, where
> the 'detail' field is completely different between different event types.
> There is no support for this connector using PyFlink DataStream API, so I
> use the Table API to construct the source table.  The table looks like this:
>
>
> CREATE TABLE events (
>      `id` VARCHAR,
>      `source` VARCHAR,
>      `account` VARCHAR,
>      `region` VARCHAR,
>      `detail-type` VARCHAR,
>      `detail` VARCHAR,
>      `source` VARCHAR,
>      `resources` VARCHAR,
>      `time` TIMESTAMP(0) WITH LOCAL TIME ZONE,
>      WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
>      PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> ...
> )
>
>
>
> The table was created using:
>
>  table_env.execute_sql(CREATE_STRING_ABOVE)
>
> I'd like to turn this table into a data stream so I can perform some
> processing that is easier to do in the DataStream API:
>
>
> events_stream_table = table_env.from_path('events')
>
> events_stream = table_env.to_data_stream(events_stream_table)
>
> # now do some processing - let's filter by the type of event we get
>
> codebuild_stream = events_stream.filter(
>     lambda event: event['source'] == 'aws.codebuild'
> )
>
> # now do other stuff on a stream containing only events that are identical
> in shape
> ...
> # maybe convert back into a Table and perform SQL on the data
>
>
> When I run this, I get an exception:
>
>
> org.apache.flink.table.api.TableException: Unsupported conversion from data 
> type
>
>  'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to
>
> type information. Only data types that originated from type information fully
>
> support a reverse conversion.
>
>
> Somebody reported a similar error here (
> https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception)
> When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL
> TIME ZONE" with a "TIMESTAMP(3)" I get a different exception:
>
> TypeError: The java type info: LocalDateTime is not supported in PyFlink
> currently.
>
>
> Is there a way of converting this Table into a DataStream (and then back
> again)? I need to use the data in the "time"​ field as the source of
> watermarks for my events.
>
> Many thanks,
>
> John
>

Reply via email to