[ https://issues.apache.org/jira/browse/FLINK-36688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sergey Nuyanzin resolved FLINK-36688. ------------------------------------- Resolution: Fixed Merged as [5c87afeceffc52d2e0a24cf547bae6eda4075cfe|https://github.com/apache/flink/commit/5c87afeceffc52d2e0a24cf547bae6eda4075cfe] > table.optimizer.reuse-source-enabled may cause disordered metadata columns > when reading from Kafka. > ----------------------------------------------------------------------------------------------------- > > Key: FLINK-36688 > URL: https://issues.apache.org/jira/browse/FLINK-36688 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.20.0, 1.19.1, 2.0-preview > Reporter: Yanquan Lv > Priority: Major > Labels: pull-request-available > > Metadata columns in Kafka need to maintain a fixed order: The metadata for > format needs to be at the beginning, while the metadata for Kafka > itself(partition/offset and so on) needs to be at the end. Kafka connector > will add fields of format first, and then add fields of Kafka later. > However, reused Source did not maintain this order, witch may cause > ClassCastException. > How to product: > {code:java} > create temporary table `message_channel_task_record` > ( > origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL > ,`partition` INT METADATA VIRTUAL > ,`offset` BIGINT METADATA VIRTUAL > ,id BIGINT comment '自增ID' > ,PRIMARY KEY (`id`) NOT ENFORCED > ) > with ( > 'connector'='kafka' > xxx > ) > ; > create temporary table `sink` > ( > origin_ts TIMESTAMP(3) > ,`partition` INT > ,`offset` BIGINT > ,id BIGINT > ) > WITH ( > 'connector'='print' > ) > ; > create temporary table `sr_sink` > ( > id BIGINT comment '自增ID' > ) > WITH ( > 'connector'='print' > ) > ; > -- EXPLAIN STATEMENT SET BEGIN > BEGIN STATEMENT SET; > INSERT INTO sink > SELECT > origin_ts > ,`partition` > ,`offset` > ,id > FROM message_channel_task_record > ; > INSERT INTO `sr_sink` > SELECT > id > FROM `message_channel_task_record` > ; > END > ; > {code} > Explained plan: > {code:java} > [558]:TableSourceScan(table=[[vvp, default, message_channel_task_record, > project=[id], metadata=[partition, value.ingestion-timestamp, offset]]], > fields=[id, partition, origin_ts, offset]) > :- [559]:Calc(select=[CAST(origin_ts AS TIMESTAMP(3)) AS origin_ts, > CAST(partition AS INTEGER) AS partition, CAST(offset AS BIGINT) AS offset, > id]) > : +- [560]:Sink(table=[vvp.default.sink], fields=[origin_ts, partition, > offset, id]) > +- [562]:Sink(table=[vvp.default.sr_sink], fields=[id]) {code} > Expected metadata column order is: value.ingestion-timestamp(format), > partition, offset; > The actual metadata column order is: partition, > value.ingestion-timestamp(format), offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)