Hi flink-users! I need advice about how to tackle a programming problem I’m
facing. I have a bunch of jobs that look something like this sketch
Source<GenericRecord> kafkaSource;
kafkaSource
.map(function that takes generic record)
.map( ... )
...
.sink(kafka sink that takes in generic records)
The reason we represent data as GenericRecords is that the Avro schemas that
are in play vary during runtime. We write schema descriptions to a separate
topic. We're aware of the performance penalty of passing GenericRecords to/from
operators/kafka, so we wrote our own Kafka serialization schema and Kryo
serializer for GenericRecords. The tricky part is our custom serializer needs
to know what the current list of schemas is so it can figure out how to ser/de
messages as they pass through the graph.
I can't for the life of me figure out how to pass this info into our
serializers in a sane way. The methods I'm aware of are:
1. A static field somewhere that polls an external system for the list of
records. We already do this but we think it will cause class loader leaks,
since the polling thread is created inside the custom serializer, and its not
clear where we should cancel it.
2. Broadcast state. We could try and stream the schemas around our graph
using broadcast state, but that means its going to be much less ergonomic to
write these jobs; every single operator in the graph will have to receive the
broadcast state, and will have to handle serialization internally instead of
using our custom serializer
3. A static field which is populated by the kafka stream. This avoids the
thread leak of (1), but I don't think flink guarantees we can get this static
field populated in every task slot. Its hard to control where the kafka stream
will be processed.
I know its a complicated situation, so I hope it came across clearly. I feel
pretty stumped, as none of the solutions I've considered seem adequate. Is
there another option I haven't thought of? Is there a better way to manage a
dynamic set of Avro schemas without restarting? Would love any advice! Thanks!
SO:
https://stackoverflow.com/questions/64054753/live-updating-serialization-schemas-in-flink