Big +1 for this FLIP. Recently I'm working on some Kafka topics that have timestamps as metadata, not in the message body. I want to declare a table from the topics with DDL but "rowtime_column_name" in <watermark_definition> seems to accept only existing columns.
> <watermark_definition>: > WATERMARK FOR rowtime_column_name AS watermark_strategy_expression > > I raised an issue in user@ list but committers advise to use alternative approaches that call for detailed knowledge of Flink like custom decoding format or conversion between DataStream API and TableEnvironment. It is definitely against the main advantage of Flink SQL, simplicity and ease of use. This FLIP must be implemented IMHO in order for users to derive tables freely from any Kafka topic without having to involve DataStream API. Best, Dongwon On 2020/03/01 14:30:31, Dawid Wysakowicz <d...@apache.org> wrote: > Hi,> > > I would like to propose an improvement that would enable reading table> > columns from different parts of source records. Besides the main payload> > majority (if not all of the sources) expose additional information. It> > can be simply a read-only metadata such as offset, ingestion time or a> > read and write parts of the record that contain data but additionally> > serve different purposes (partitioning, compaction etc.), e.g. key or> > timestamp in Kafka.> > > We should make it possible to read and write data from all of those> > locations. In this proposal I discuss reading partitioning data, for> > completeness this proposal discusses also the partitioning when writing> > data out.> > > I am looking forward to your comments.> > > You can access the FLIP here:> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode> > > Best,> > > Dawid> > > >