[ 
https://issues.apache.org/jira/browse/FLINK-30899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mate Czagany updated FLINK-30899:
---------------------------------
    Labels: pull-request-available  (was: pull-request-available stale-assigned)

> FileSystemTableSource with CSV format incorrectly selects fields if filtering 
> for partition
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30899
>                 URL: https://issues.apache.org/jira/browse/FLINK-30899
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.17.0, 1.18.0
>            Reporter: Mate Czagany
>            Assignee: Mate Czagany
>            Priority: Major
>              Labels: pull-request-available
>
> In my testing it only affected csv and testcsv formats.
>  
> I think it's caused by `FileSystemTableSource` calling 
> `DeserializationFormatFactory#createRuntimeDecoder` with wrong 
> `physicalDataType`. The files won't contain the partitioned field values, but 
> in case of a projection pushdown (which can happen during planning phase if 
> we filter the partition field by a constant value) the final 
> `physicalDataType` passed to the deserializer by `FileSystemTableSource` will 
> contain the partitioned fields as well. As described in `DecodingFormat`, 
> every field in the `physicalDataType` parameter will have to be present in 
> the serialized record.
>  
> Example:
> {code:java}
> CREATE TABLE test_table (
>   f0 INT,
>   f1 INT,
>   f2 INT,
>   f3 INT
> ) PARTITIONED BY (f0,f1) WITH (
>   'connector' = 'filesystem',
>   'path' = 'file:///path/to/whatever',
>   'format' = 'csv'
> )
> SELECT * FROM test_table WHERE f0 = 1;
> -- !!!! should be 1,4,7,10 !!!! 
> +-------------+-------------+-------------+-------------+
> |          f0 |          f1 |          f2 |          f3 |
> +-------------+-------------+-------------+-------------+
> |           1 |           4 |          10 |           0 |
> +-------------+-------------+-------------+-------------+
> SELECT * FROM test_table;
> +-------------+-------------+-------------+-------------+
> |          f0 |          f1 |          f2 |          f3 |
> +-------------+-------------+-------------+-------------+
> |           2 |           5 |           8 |          11 |
> |           1 |           4 |           7 |          10 |
> |           3 |           6 |           9 |          12 |
> +-------------+-------------+-------------+-------------+
> SELECT * FROM test_table WHERE f0>0;
> +-------------+-------------+-------------+-------------+
> |          f0 |          f1 |          f2 |          f3 |
> +-------------+-------------+-------------+-------------+
> |           1 |           4 |           7 |          10 |
> |           3 |           6 |           9 |          12 |
> |           2 |           5 |           8 |          11 |
> +-------------+-------------+-------------+-------------+
> SELECT * FROM test_table WHERE f0 = 1 AND f1 = 4;
> ...
> Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds 
> for length 4
>     at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:49)
>     at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:27)
>     at 
> org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:101)
>     at 
> org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:92)
>     at 
> org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:42)
>     at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> ... {code}
> At 
> [https://github.com/apache/flink/blob/b1e70aebd3e248d68cf41a43db385ec9c9b6235a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java#L147]
>  the `physicalRowDataType` will contain the partition fields as well, but 
> `partitionKeysToExtract` will not contain it since `producedDataType` has 
> been modified in the `applyProjection` method, so it will result in an empty 
> projection. Then on line 154 we construct the final `physicalDataType`, but 
> since `partitionKeysProjections` is empty, it will result with the same value 
> as `physicalDataType` which contains the partition fields too.
> By changing
> {code:java}
>  final Projection partitionKeysProjections = 
> Projection.fromFieldNames(physicalDataType, partitionKeysToExtract);{code}
> to
> {code:java}
>  final Projection partitionKeysProjections = 
> Projection.fromFieldNames(physicalDataType, partitionKeys);{code}
> the issue can be solved. I have verified this solution with 1 and 2 partition 
> keys, with and without metadata columns, with and without virtual columns. 
> But I still need to test this change with other formats.
>  
> If this solution seems correct and a committer could assign me to the JIRA I 
> can start working on it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to