Hi all,
In case it helps someone in the future, I wanted to share that I was
finally able to identify the problem and create a fix that works for me.
As mentioned in earlier messages, the default Avro behavior in Flink
generates a schema based on the table definition. This includes some
assumptions — for example, the record name defaults to "record" and field
names are flattened with underscores. These assumptions didn’t match the
Avro records produced on my side, which caused deserialization issues.
I tried to avoid it but ultimately my solution involved creating a new
format which I named 'plain-avro', and given a schema it uses a custom
deserializer for each record, instead of the generated one. Since my
classes are generated classes I simply used the generated schema.
You can find a minimal example here:
https://github.com/yardenbm/flink-table-format-plain-avro
Hopefully this can be useful for someone in the future

Yarden

‫

> Hi again,
>
> Let me add some clarifying details:
> My use case involves writing Avro records (events) to a kafka topic from
> pipelineA using DataStream API (without schema registry, because of
> performance concerns).
> Then, I have pipelineB which uses Table API, creates a table with the
> relevant fields (which are again, nested) and lets say SELECT * from it and
> outputs to a 2nd topic "output-topic". This however does not work properly.
>
> I believe that my problem is with serialization/deserialization between
> DataStream API & Table API.
> My actual avro records are with actual namings of the records (e.g.
> "Student") and namespace (e.g. com.myexample.avro.generated), while on
> Table API naming convention is generated such as name is explicitly
> "record" and namespace is explicitly "org.apache.flink.avro.generated". The
> same goes for nested fields, where Table API generates namings like
> "record_student_name".
>
> I am not sure if my use case is unique, hence this functionality may not
> be supported (DataStream -> Table without SR), but if that's not the case I
> would appreciate tips on how to achieve this transition :)
>
> Thanks again
>
> ‫בתאריך יום א׳, 8 ביוני 2025 ב-11:18 מאת ‪Yarden BenMoshe‬‏ <‪
> yarde...@gmail.com‬‏>:‬
>
>> Hi all,
>> I am trying to create a Flink SQL pipeline that will consume from a kafka
>> topic that contains plain avro objects (no schema registry used).
>> As I can see in the docs, for plain avro, the schema (in flink sql
>> context) will be inferred from the table definition.
>>
>> My problem is that the schema I use represents a very large, nested
>> object, and I have deserialization issue that I cannot understand. The
>> error i get is a very generic
>>
>> ```
>>     Caused by: org.apache.avro.InvalidNumberEncodingException: Invalid
>> int encoding
>> ```
>>
>> I manually created the table that should match the schema of the topic,
>> but stuck with debugging where the issue comes from (or exactly, which
>> field is causing problems).
>>
>> I was wondering if anyone encountered a similar use case and have any
>> tips on how to better debug this.
>>
>> Thanks,
>> Yarden
>>
>

Reply via email to