[ https://issues.apache.org/jira/browse/FLINK-30899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mate Czagany updated FLINK-30899: --------------------------------- Labels: pull-request-available (was: pull-request-available stale-assigned) > FileSystemTableSource with CSV format incorrectly selects fields if filtering > for partition > ------------------------------------------------------------------------------------------- > > Key: FLINK-30899 > URL: https://issues.apache.org/jira/browse/FLINK-30899 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem > Affects Versions: 1.17.0, 1.18.0 > Reporter: Mate Czagany > Assignee: Mate Czagany > Priority: Major > Labels: pull-request-available > > In my testing it only affected csv and testcsv formats. > > I think it's caused by `FileSystemTableSource` calling > `DeserializationFormatFactory#createRuntimeDecoder` with wrong > `physicalDataType`. The files won't contain the partitioned field values, but > in case of a projection pushdown (which can happen during planning phase if > we filter the partition field by a constant value) the final > `physicalDataType` passed to the deserializer by `FileSystemTableSource` will > contain the partitioned fields as well. As described in `DecodingFormat`, > every field in the `physicalDataType` parameter will have to be present in > the serialized record. > > Example: > {code:java} > CREATE TABLE test_table ( > f0 INT, > f1 INT, > f2 INT, > f3 INT > ) PARTITIONED BY (f0,f1) WITH ( > 'connector' = 'filesystem', > 'path' = 'file:///path/to/whatever', > 'format' = 'csv' > ) > SELECT * FROM test_table WHERE f0 = 1; > -- !!!! should be 1,4,7,10 !!!! > +-------------+-------------+-------------+-------------+ > | f0 | f1 | f2 | f3 | > +-------------+-------------+-------------+-------------+ > | 1 | 4 | 10 | 0 | > +-------------+-------------+-------------+-------------+ > SELECT * FROM test_table; > +-------------+-------------+-------------+-------------+ > | f0 | f1 | f2 | f3 | > +-------------+-------------+-------------+-------------+ > | 2 | 5 | 8 | 11 | > | 1 | 4 | 7 | 10 | > | 3 | 6 | 9 | 12 | > +-------------+-------------+-------------+-------------+ > SELECT * FROM test_table WHERE f0>0; > +-------------+-------------+-------------+-------------+ > | f0 | f1 | f2 | f3 | > +-------------+-------------+-------------+-------------+ > | 1 | 4 | 7 | 10 | > | 3 | 6 | 9 | 12 | > | 2 | 5 | 8 | 11 | > +-------------+-------------+-------------+-------------+ > SELECT * FROM test_table WHERE f0 = 1 AND f1 = 4; > ... > Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds > for length 4 > at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:49) > at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:27) > at > org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:101) > at > org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:92) > at > org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:42) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > ... {code} > At > [https://github.com/apache/flink/blob/b1e70aebd3e248d68cf41a43db385ec9c9b6235a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java#L147] > the `physicalRowDataType` will contain the partition fields as well, but > `partitionKeysToExtract` will not contain it since `producedDataType` has > been modified in the `applyProjection` method, so it will result in an empty > projection. Then on line 154 we construct the final `physicalDataType`, but > since `partitionKeysProjections` is empty, it will result with the same value > as `physicalDataType` which contains the partition fields too. > By changing > {code:java} > final Projection partitionKeysProjections = > Projection.fromFieldNames(physicalDataType, partitionKeysToExtract);{code} > to > {code:java} > final Projection partitionKeysProjections = > Projection.fromFieldNames(physicalDataType, partitionKeys);{code} > the issue can be solved. I have verified this solution with 1 and 2 partition > keys, with and without metadata columns, with and without virtual columns. > But I still need to test this change with other formats. > > If this solution seems correct and a committer could assign me to the JIRA I > can start working on it -- This message was sent by Atlassian Jira (v8.20.10#820010)