Hi all, We tried to read some Avro files with the Flink SQL (1.16.1) and noticed that the projection pushdown seems to be buggy.
The Avro schema we used has 4 fields, namely f1, f2, f3 and f4. When using "SELECT *" or SELECT the first n fields (e.g., SELECT f1 or SELECT f1, f2) to read the table, it works fine. However, if we query an arbitrary field other than f1, a data & converter mismatch exception will show. After digging into the code, I figured out that `AvroFileFormatFactory` generates a `producedDataType` with projection push down. When generating AvroToRowDataConverters, it only considers the selected fields and generates converters for them. However, the records read by the DataFileReader contain all fields. Specifically, for the following code snippet from AvroToRowDataConverters, the fieldConverters contains only the converters for the selected fields but the record contains all fields which leads to a converters & data fields mismatch problem. That also explains why selecting the first n fields works (It's because the converters & data fields happen to match). ``` return avroObject -> { IndexedRecord record = (IndexedRecord) avroObject; GenericRowData row = new GenericRowData(arity); for (int i = 0; i < arity; ++i) { // avro always deserialize successfully even though the type isn't matched // so no need to throw exception about which field can't be deserialized row.setField(i, fieldConverters[i].convert(record.get(i))); } return row; }; ``` Not sure if any of you hit this before. If it's confirmed to be a bug, I'll file a ticket and try to fix it. Best, Xingcan