Hi flink-users! I need advice about how to tackle a programming problem I’m 
facing. I have a bunch of jobs that look something like this sketch
Source<GenericRecord> kafkaSource;
kafkaSource
    .map(function that takes generic record)
    .map( ... )
    ...
    .sink(kafka sink that takes in generic records)
The reason we represent data as GenericRecords is that the Avro schemas that 
are in play vary during runtime. We write schema descriptions to a separate 
topic. We're aware of the performance penalty of passing GenericRecords to/from 
operators/kafka, so we wrote our own Kafka serialization schema and Kryo 
serializer for GenericRecords. The tricky part is our custom serializer needs 
to know what the current list of schemas is so it can figure out how to ser/de 
messages as they pass through the graph.
I can't for the life of me figure out how to pass this info into our 
serializers in a sane way. The methods I'm aware of are:
1.      A static field somewhere that polls an external system for the list of 
records. We already do this but we think it will cause class loader leaks, 
since the polling thread is created inside the custom serializer, and its not 
clear where we should cancel it.
2.      Broadcast state. We could try and stream the schemas around our graph 
using broadcast state, but that means its going to be much less ergonomic to 
write these jobs; every single operator in the graph will have to receive the 
broadcast state, and will have to handle serialization internally instead of 
using our custom serializer
3.      A static field which is populated by the kafka stream. This avoids the 
thread leak of (1), but I don't think flink guarantees we can get this static 
field populated in every task slot. Its hard to control where the kafka stream 
will be processed.
I know its a complicated situation, so I hope it came across clearly. I feel 
pretty stumped, as none of the solutions I've considered seem adequate. Is 
there another option I haven't thought of? Is there a better way to manage a 
dynamic set of Avro schemas without restarting? Would love any advice! Thanks!

SO: 
https://stackoverflow.com/questions/64054753/live-updating-serialization-schemas-in-flink

Reply via email to