Hi all, In case it helps someone in the future, I wanted to share that I was finally able to identify the problem and create a fix that works for me. As mentioned in earlier messages, the default Avro behavior in Flink generates a schema based on the table definition. This includes some assumptions — for example, the record name defaults to "record" and field names are flattened with underscores. These assumptions didn’t match the Avro records produced on my side, which caused deserialization issues. I tried to avoid it but ultimately my solution involved creating a new format which I named 'plain-avro', and given a schema it uses a custom deserializer for each record, instead of the generated one. Since my classes are generated classes I simply used the generated schema. You can find a minimal example here: https://github.com/yardenbm/flink-table-format-plain-avro Hopefully this can be useful for someone in the future
Yarden > Hi again, > > Let me add some clarifying details: > My use case involves writing Avro records (events) to a kafka topic from > pipelineA using DataStream API (without schema registry, because of > performance concerns). > Then, I have pipelineB which uses Table API, creates a table with the > relevant fields (which are again, nested) and lets say SELECT * from it and > outputs to a 2nd topic "output-topic". This however does not work properly. > > I believe that my problem is with serialization/deserialization between > DataStream API & Table API. > My actual avro records are with actual namings of the records (e.g. > "Student") and namespace (e.g. com.myexample.avro.generated), while on > Table API naming convention is generated such as name is explicitly > "record" and namespace is explicitly "org.apache.flink.avro.generated". The > same goes for nested fields, where Table API generates namings like > "record_student_name". > > I am not sure if my use case is unique, hence this functionality may not > be supported (DataStream -> Table without SR), but if that's not the case I > would appreciate tips on how to achieve this transition :) > > Thanks again > > בתאריך יום א׳, 8 ביוני 2025 ב-11:18 מאת Yarden BenMoshe < > yarde...@gmail.com>: > >> Hi all, >> I am trying to create a Flink SQL pipeline that will consume from a kafka >> topic that contains plain avro objects (no schema registry used). >> As I can see in the docs, for plain avro, the schema (in flink sql >> context) will be inferred from the table definition. >> >> My problem is that the schema I use represents a very large, nested >> object, and I have deserialization issue that I cannot understand. The >> error i get is a very generic >> >> ``` >> Caused by: org.apache.avro.InvalidNumberEncodingException: Invalid >> int encoding >> ``` >> >> I manually created the table that should match the schema of the topic, >> but stuck with debugging where the issue comes from (or exactly, which >> field is causing problems). >> >> I was wondering if anyone encountered a similar use case and have any >> tips on how to better debug this. >> >> Thanks, >> Yarden >> >