Hi all,

We tried to read some Avro files with the Flink SQL (1.16.1) and noticed
that the projection pushdown seems to be buggy.

The Avro schema we used has 4 fields, namely f1, f2, f3 and f4. When using
"SELECT *" or SELECT the first n fields (e.g., SELECT f1 or SELECT f1, f2)
to read the table, it works fine. However, if we query an arbitrary field
other than f1, a data & converter mismatch exception will show.

After digging into the code, I figured out that `AvroFileFormatFactory`
generates a `producedDataType` with projection push down. When generating
AvroToRowDataConverters, it only considers the selected fields and
generates converters for them. However, the records read by the
DataFileReader contain all fields.

Specifically, for the following code snippet from AvroToRowDataConverters,
the fieldConverters contains only the converters for the selected fields
but the record contains all fields which leads to a converters & data
fields mismatch problem. That also explains why selecting the first n
fields works (It's because the converters & data fields happen to match).

```
return avroObject -> {
      IndexedRecord record = (IndexedRecord) avroObject;
      GenericRowData row = new GenericRowData(arity);
      for (int i = 0; i < arity; ++i) {
        // avro always deserialize successfully even though the type isn't
matched
        // so no need to throw exception about which field can't be
deserialized
        row.setField(i, fieldConverters[i].convert(record.get(i)));
      }
      return row;
    };
```

Not sure if any of you hit this before. If it's confirmed to be a bug, I'll
file a ticket and try to fix it.

Best,
Xingcan

Reply via email to