Alright, I figured it out—it's very similar to FLINK-13703, but instead of having to do with immutable fields, it's due to use of the Avro Gradle plugin option `gettersReturnOptional`.
With this option, the generated code uses Optional for getters, but it's particularly useful with the option `optionalGettersForNullableFieldsOnly`. The presence of Optional-returning getters causes Flink's POJO analyzer to return null. I didn't run into this previously because I used both options never had nullable fields in my schemas! I don't suppose this would be considered a bug, but I'll leave a comment on the above issue. -- Patrick Lucas On Mon, Jun 14, 2021 at 5:06 PM Patrick Lucas <patr...@atomicwire.io> wrote: > Hi, > > I have read [1] when it comes to using Avro for serialization, but I'm > stuck with a mysterious exception when Flink is doing type resolution. > (Flink 1.13.1) > > Basically, I'm able to use a SpecificRecord type in my source, but I am > unable to use different SpecificRecord types later in the pipeline, getting > an exception "Expecting type to be a PojoTypeInfo" from AvroTypeInfo[2]. > > Let's say I have a schema "Foo" with one field "foo" of type "Bar", and > schema "Bar" with one field "message" of type "string". My input data is a > single Foo record of the form {"foo": {"message": "hi"}}. > > This works: > > env.fromElements(myInput).print(); > > But this does not: > > env.fromElements(myInput).map(foo -> (Bar) foo.getFoo()).print(); > > (nor does it work if I use a full MapFunction<Foo, Bar>) > > Does anyone know what I might be running into here? If necessary, I can > put together a full reproducing. > > [1] > https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro > [2] > https://github.com/apache/flink/blob/release-1.13.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java#L72 > > Thanks, > Patrick >