[ https://issues.apache.org/jira/browse/FLINK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Francesco Guardiani updated FLINK-26549: ---------------------------------------- Attachment: FlinkTypeFactoryTest.patch > INSERT INTO with VALUES leads to wrong type inference with nested types > ----------------------------------------------------------------------- > > Key: FLINK-26549 > URL: https://issues.apache.org/jira/browse/FLINK-26549 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Francesco Guardiani > Assignee: Francesco Guardiani > Priority: Major > Attachments: FlinkTypeFactoryTest.patch, InsertIntoValuesTest.patch, > Least_restrictive_issue.patch > > > While working on casting, I've found out we have an interesting bug in the > insert values type inference. This comes from the > {{KafkaTableITCase#testKafkaSourceSinkWithMetadata}} (look at this version in > particular > https://github.com/apache/flink/blob/567440115bcacb5aceaf3304e486281c7da8c14f/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java). > The test scenario is an INSERT INTO VALUES query which is also pushing some > metadata to a Kafka table, in particular is writing the headers metadata. > The table is declared like that: > {code:sql} > CREATE TABLE kafka ( > `physical_1` STRING, > `physical_2` INT, > `timestamp-type` STRING METADATA VIRTUAL, > `timestamp` TIMESTAMP(3) METADATA, > `leader-epoch` INT METADATA VIRTUAL, > `headers` MAP<STRING, BYTES> METADATA, > `partition` INT METADATA VIRTUAL, > `topic` STRING METADATA VIRTUAL, > `physical_3` BOOLEAN > ) WITH ( > 'connector' = 'kafka', > [...] > ) > {code} > The insert into query looks like: > {code:sql} > INSERT INTO kafka VALUES > ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', x'C0FFEE', 'k2', > x'BABE'], TRUE), > ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP<STRING, > BYTES>), FALSE), > ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', > X'20'], TRUE) > {code} > Note that in the first row, the byte literal is of length 3, while in the > last row the byte literal is of length 1. > The generated plan of this INSERT INTO is: > {code} > == Abstract Syntax Tree == > LogicalSink(table=[default_catalog.default_database.kafka], > fields=[physical_1, physical_2, physical_3, headers, timestamp]) > +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], > headers=[CAST($3):(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", > VARBINARY(2147483647)) MAP], > timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)]) > +- LogicalUnion(all=[true]) > :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], > EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', > X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true]) > : +- LogicalValues(tuples=[[{ 0 }]]) > :- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], > EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], > EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", > VARBINARY(2147483647)) MAP], EXPR$4=[false]) > : +- LogicalValues(tuples=[[{ 0 }]]) > +- LogicalProject(EXPR$0=[_UTF-16LE'data 3'], EXPR$1=[3], > EXPR$2=[2020-03-10 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', > X'10':BINARY(1), _UTF-16LE'k2', X'20':BINARY(1))], EXPR$4=[true]) > +- LogicalValues(tuples=[[{ 0 }]]) > == Optimized Physical Plan == > Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, > physical_2, physical_3, headers, timestamp]) > +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, > timestamp]) > :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS > physical_3, CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), > _UTF-16LE'k2', X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT > NULL, BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET > "UTF-16LE", VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 > 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp]) > : +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]) > :- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS > physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", > VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 > 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp]) > : +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]) > +- Calc(select=[_UTF-16LE'data 3' AS physical_1, 3 AS physical_2, true AS > physical_3, CAST(MAP(_UTF-16LE'k1', X'10':BINARY(1), _UTF-16LE'k2', > X'20':BINARY(1)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", > VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 > 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp]) > +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]) > == Optimized Execution Plan == > Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, > physical_2, physical_3, headers, timestamp]) > +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, > timestamp]) > :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS > physical_3, CAST(CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (CHAR(2), > BINARY(1)) MAP) AS (VARCHAR(2147483647), VARBINARY(2147483647)) MAP) AS > headers, CAST(2020-03-08 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) > AS timestamp]) > : +- Values(tuples=[[{ 0 }]])(reuse_id=[1]) > :- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS > physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, > CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS > timestamp]) > : +- Reused(reference_id=[1]) > +- Calc(select=['data 3' AS physical_1, 3 AS physical_2, true AS > physical_3, CAST(MAP('k1', X'10', 'k2', X'20') AS (VARCHAR(2147483647), > VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-10 12:12:11.123 AS > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp]) > +- Reused(reference_id=[1]) > {code} > As you see, in the _Abstract Syntax Tree_ section a casting for the headers > is injected (although unnecessary, as it should be an identity cast), but > then in _Optimized Physical Plan_ another casting is injected: > {code} > CAST(CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', > X'babe':VARBINARY(3)) AS (CHAR(2) CHARACTER SET "UTF-16LE" NOT NULL, > BINARY(1) NOT NULL) MAP) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", > VARBINARY(2147483647)) MAP) AS headers > {code} > Which makes no sense, as it's casting the values of the map first to > {{BINARY(1)}} and then to {{BYTES}}, causing to trim the last 2 bytes. > Removing the last row to insert makes the VALUES type inference work properly: > {code} > == Abstract Syntax Tree == > LogicalSink(table=[default_catalog.default_database.kafka], > fields=[physical_1, physical_2, physical_3, headers, timestamp]) > +- LogicalProject(physical_1=[$0], physical_2=[$1], physical_3=[$4], > headers=[$3], timestamp=[CAST($2):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)]) > +- LogicalUnion(all=[true]) > :- LogicalProject(EXPR$0=[_UTF-16LE'data 1'], EXPR$1=[1], > EXPR$2=[2020-03-08 13:12:11.123:TIMESTAMP(3)], EXPR$3=[MAP(_UTF-16LE'k1', > X'c0ffee':VARBINARY(3), _UTF-16LE'k2', X'babe':VARBINARY(3))], EXPR$4=[true]) > : +- LogicalValues(tuples=[[{ 0 }]]) > +- LogicalProject(EXPR$0=[_UTF-16LE'data 2'], EXPR$1=[2], > EXPR$2=[2020-03-09 13:12:11.123:TIMESTAMP(3)], > EXPR$3=[null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", > VARBINARY(2147483647)) MAP], EXPR$4=[false]) > +- LogicalValues(tuples=[[{ 0 }]]) > == Optimized Physical Plan == > Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, > physical_2, physical_3, headers, timestamp]) > +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, > timestamp]) > :- Calc(select=[_UTF-16LE'data 1' AS physical_1, 1 AS physical_2, true AS > physical_3, CAST(MAP(_UTF-16LE'k1', X'c0ffee':VARBINARY(3), _UTF-16LE'k2', > X'babe':VARBINARY(3)) AS (VARCHAR(2147483647) CHARACTER SET "UTF-16LE", > VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 > 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp]) > : +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]) > +- Calc(select=[_UTF-16LE'data 2' AS physical_1, 2 AS physical_2, false AS > physical_3, null:(VARCHAR(2147483647) CHARACTER SET "UTF-16LE", > VARBINARY(2147483647)) MAP AS headers, CAST(2020-03-09 > 12:12:11.123:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) AS > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp]) > +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]) > == Optimized Execution Plan == > Sink(table=[default_catalog.default_database.kafka], fields=[physical_1, > physical_2, physical_3, headers, timestamp]) > +- Union(all=[true], union=[physical_1, physical_2, physical_3, headers, > timestamp]) > :- Calc(select=['data 1' AS physical_1, 1 AS physical_2, true AS > physical_3, CAST(MAP('k1', X'c0ffee', 'k2', X'babe') AS (VARCHAR(2147483647), > VARBINARY(2147483647)) MAP) AS headers, CAST(2020-03-08 12:12:11.123 AS > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS timestamp]) > : +- Values(tuples=[[{ 0 }]])(reuse_id=[1]) > +- Calc(select=['data 2' AS physical_1, 2 AS physical_2, false AS > physical_3, null:(VARCHAR(2147483647), VARBINARY(2147483647)) MAP AS headers, > CAST(2020-03-09 12:12:11.123 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS > timestamp]) > +- Reused(reference_id=[1]) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)