After rechecking it, I realized that some of my changes broke the expected
schema passed to GenericDatumReader#getResolver. The logic in Flink
codebase is okay and we should only read a portion of the Avro record.

Thanks, Xingcan

On Sun, Aug 6, 2023 at 2:31 PM liu ron <ron9....@gmail.com> wrote:

> Hi, Xingcan
>
> After deep dive into the source code, I also think it is a bug.
>
> Best,
> Ron
>
> Xingcan Cui <xingc...@gmail.com> 于2023年8月5日周六 23:27写道:
>
> > Hi all,
> >
> > We tried to read some Avro files with the Flink SQL (1.16.1) and noticed
> > that the projection pushdown seems to be buggy.
> >
> > The Avro schema we used has 4 fields, namely f1, f2, f3 and f4. When
> using
> > "SELECT *" or SELECT the first n fields (e.g., SELECT f1 or SELECT f1,
> f2)
> > to read the table, it works fine. However, if we query an arbitrary field
> > other than f1, a data & converter mismatch exception will show.
> >
> > After digging into the code, I figured out that `AvroFileFormatFactory`
> > generates a `producedDataType` with projection push down. When generating
> > AvroToRowDataConverters, it only considers the selected fields and
> > generates converters for them. However, the records read by the
> > DataFileReader contain all fields.
> >
> > Specifically, for the following code snippet from
> AvroToRowDataConverters,
> > the fieldConverters contains only the converters for the selected fields
> > but the record contains all fields which leads to a converters & data
> > fields mismatch problem. That also explains why selecting the first n
> > fields works (It's because the converters & data fields happen to match).
> >
> > ```
> > return avroObject -> {
> >       IndexedRecord record = (IndexedRecord) avroObject;
> >       GenericRowData row = new GenericRowData(arity);
> >       for (int i = 0; i < arity; ++i) {
> >         // avro always deserialize successfully even though the type
> isn't
> > matched
> >         // so no need to throw exception about which field can't be
> > deserialized
> >         row.setField(i, fieldConverters[i].convert(record.get(i)));
> >       }
> >       return row;
> >     };
> > ```
> >
> > Not sure if any of you hit this before. If it's confirmed to be a bug,
> I'll
> > file a ticket and try to fix it.
> >
> > Best,
> > Xingcan
> >
>

Reply via email to