[ 
https://issues.apache.org/jira/browse/FLINK-36688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-36688.
-------------------------------------
    Resolution: Fixed

Merged as 
[5c87afeceffc52d2e0a24cf547bae6eda4075cfe|https://github.com/apache/flink/commit/5c87afeceffc52d2e0a24cf547bae6eda4075cfe]

> table.optimizer.reuse-source-enabled may cause disordered metadata columns 
> when reading from Kafka.  
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36688
>                 URL: https://issues.apache.org/jira/browse/FLINK-36688
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0, 1.19.1, 2.0-preview
>            Reporter: Yanquan Lv
>            Priority: Major
>              Labels: pull-request-available
>
> Metadata columns in Kafka need to maintain a fixed order: The metadata for 
> format needs to be at the beginning, while the metadata for Kafka 
> itself(partition/offset and so on) needs to be at the end. Kafka connector 
> will add fields of format first, and then add fields of Kafka later.
> However, reused Source did not maintain this order, witch may cause 
> ClassCastException.
> How to product:
> {code:java}
> create temporary table `message_channel_task_record`
> (
> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL
> ,`partition` INT METADATA VIRTUAL
> ,`offset` BIGINT METADATA VIRTUAL
> ,id BIGINT comment '自增ID'
> ,PRIMARY KEY (`id`) NOT ENFORCED
> )
> with (
> 'connector'='kafka'
> xxx
> )
> ;
> create temporary table `sink`
> (
> origin_ts TIMESTAMP(3)
> ,`partition` INT
> ,`offset` BIGINT
> ,id BIGINT
> )
> WITH (
> 'connector'='print'
> )
> ;
> create temporary table `sr_sink`
> (
> id BIGINT comment '自增ID'
> )
> WITH (
> 'connector'='print'
> )
> ;
> -- EXPLAIN STATEMENT SET BEGIN
> BEGIN STATEMENT SET;
> INSERT INTO sink
> SELECT
> origin_ts
> ,`partition`
> ,`offset`
> ,id
> FROM message_channel_task_record
> ;
> INSERT INTO `sr_sink`
> SELECT
> id
> FROM `message_channel_task_record`
> ;
> END
> ;
>  {code}
> Explained plan:
> {code:java}
>  [558]:TableSourceScan(table=[[vvp, default, message_channel_task_record, 
> project=[id], metadata=[partition, value.ingestion-timestamp, offset]]], 
> fields=[id, partition, origin_ts, offset])
> :- [559]:Calc(select=[CAST(origin_ts AS TIMESTAMP(3)) AS origin_ts, 
> CAST(partition AS INTEGER) AS partition, CAST(offset AS BIGINT) AS offset, 
> id])
> :  +- [560]:Sink(table=[vvp.default.sink], fields=[origin_ts, partition, 
> offset, id])
>    +- [562]:Sink(table=[vvp.default.sr_sink], fields=[id]) {code}
> Expected metadata column order is: value.ingestion-timestamp(format), 
> partition, offset;
> The actual metadata column order is: partition, 
> value.ingestion-timestamp(format), offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to