Thanks for such a nice explanation.

  1.  Kryo is not used now as per attached classes (custom type information, 
custom type serializer, snapshot etc.), can you please have a look and let me 
know if any reflection? It is working over Flink machine.

Also, I used at the start of program intentionally to disable fallback to Kryo 
- env.getConfig().disableGenericTypes();



  1.  This is persisting complete GenericRecord right now and as I know while 
using GenericDatumWriter, it will just persist the fields as per schema and not 
schema field itself because Kryo not used here.


  1.  Little bit confused here w.r.t snapshot change you called out below for 
different Avro schemas (product, customer, order etc.), why serializer snapshot 
need to persist schemas?


  1.  Also, yes, we are using Generic Record in between of source and sink as 
well and also having a custom schema registry integration. Attached is a dummy 
program and simulating up to some extent what is done in actual application.

From: Schwalbe Matthias <[email protected]>
Sent: 23 September 2025 17:07
To: Kamal Mittal <[email protected]>; [email protected]
Subject: RE: Flink kafka source with Avro Generic Record

Hi Kamal,

Interesting topic 😊

The answer really depends on what you exactly want to do.


  *   GenericRecord always contains the AVRO schema, if serialization by Kryo 
was possible it would still serialize the schema (big) with each record
  *   Hence you need to arrange things such that the schema does not end up in 
the serialized record
  *   We use AVRO GenericRecord just one map() operator before the sink (i.e. 
not for state etc.), we transfer the GenericRecord from the Map() operator to 
the sink, but
  *   reconfigure the chaining strategy: 
setChainingStrategy(ChainingStrategy.ALWAYS) on the operator such that the 
GenericRecords don’t get serialize between the map() and the sink
  *   We don’t work with GenericRecord in any place other that sources and sinks


  *   From your description I assume you use GenericRecord in other places that 
source/sinks
  *   That means you need to have/create TypeInformation[GenericRecord] that 
contains the schema in use and serializes by means  of AVRO, plus a 
TypeSerializerSnapshot that persists the AVRO schema
  *   That is some 50 lines of code
  *   However, your description also indicates that you use AVRO to also 
support different event types in the same place like a Coproduct (i.e. Sum 
type, enum type) ??!
  *   In that case TypeInformation needs to be a little more complicated, the 
Serializer Snapshot needs to persist all avro schemas together with some 
respective invariant tag, and
  *   The TypeSerializer needs to store the respective tag before the 
serialized AVRO record is stored, and on deserialization the other way around 
load tag, get schema, use schema for deserialization
  *   All that is not rocket science, but a bit elaborate



Hope that helps

Thias



From: Kamal Mittal via user 
<[email protected]<mailto:[email protected]>>
Sent: Tuesday, September 23, 2025 5:05 AM
To: [email protected]<mailto:[email protected]>
Subject: [External] RE: Flink kafka source with Avro Generic Record

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Can someone please give input for below?

From: Kamal Mittal <[email protected]<mailto:[email protected]>>
Sent: 22 September 2025 17:16
To: Kamal Mittal <[email protected]<mailto:[email protected]>>; 
[email protected]<mailto:[email protected]>
Subject: RE: Flink kafka source with Avro Generic Record

Hello,

I tried this and Flink fails later, when it tries to serialize/deserialize the 
GenericRecord object for communication between operators (e.g. from map() to 
another operator, or writing checkpoints, or shuffling).
it's a serialization issue during operator chaining or data exchange in Flink’s 
runtime.

Probable reason:
GenericRecord from Avro holds schema metadata internally, which includes 
unmodifiable maps, especially:
schema (org.apache.avro.generic.GenericData$Record)
  ↳ fieldMap (org.apache.avro.Schema$RecordSchema)
     ↳ reserved (java.util.Collections$UnmodifiableMap)
These types (like UnmodifiableMap) are not easily serializable by Kryo, which 
Flink falls back to if:

  *   No proper TypeInformation or TypeSerializer is provided.
  *   Flink cannot infer a more optimized serializer.

Error Stack :
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)

Can you please confirm above understanding and also possible way to resolve 
this? Probably custom serializer and custom type information solution needed 
here, is that recommended?

Rgds,
Kamal
From: Kamal Mittal via user 
<[email protected]<mailto:[email protected]>>
Sent: 22 September 2025 13:56
To: [email protected]<mailto:[email protected]>
Subject: Flink kafka source with Avro Generic Record

Hello,

I need to support Flink application accepting avro binary events with different 
schemas over flink kafka source. Need to use custom schema registry to fetch 
schema at runtime and decode the incoming event.

Will use Avro Generic Record to decode incoming event with different avro 
schemas.

Gone through the page  - Flink Serialization Tuning Vol. 1: Choosing your 
Serializer — if you can | Apache 
Flink<https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#avro-generic>.

Can you please tell as at compile/job graph time schema is not available then 
it will use Kryo as serialzer? Also anything can be done here to improve it as 
for Kryo perf. is impacted?

Rgds,
Kamal
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Attachment: FlinkGenericRecordTypeSafeJob.java
Description: FlinkGenericRecordTypeSafeJob.java

Attachment: DynamicGenericRecordSerializer.java
Description: DynamicGenericRecordSerializer.java

Attachment: DynamicGenericRecordTypeInfo.java
Description: DynamicGenericRecordTypeInfo.java

Attachment: DynamicGenericRecordSerializerSnapshot.java
Description: DynamicGenericRecordSerializerSnapshot.java

Reply via email to