Hello! Looking to get some guidance for a problem around the Flink formats used for Kafka.
Flink currently uses common serdes interfaces across all formats. However, some data formats used in Kafka require headers for serdes. It's the same problem for serialization and deserialization, so I'll just use DynamicKafkaDeserialationSchema <https://github.com/Shopify/shopify-flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L130> as an example. It has access to the Kafka record headers, but it can't pass them to the DeserializationSchema <https://github.com/apache/flink/blob/94b55d1ae61257f21c7bb511660e7497f269abc7/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java#L81> implemented by the format since the interface is generic. If it were possible to pass the headers, then open source formats such as Apicurio could be supported. Unlike the Confluent formats which store the metadata (schema ID) appended to the serialized bytes in the key and value, the Apicurio formats store their metadata in the record headers. I have bandwidth to work on this, but it would be great to have direction from the community. I have a simple working prototype that's able to load a custom version of the format with a modified interface that can accept the headers (I just put the entire Apache Kafka ConsumerRecord <https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html> /ProducerRecord <https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html> for simplicity). The issues I foresee is that the class-loader <https://github.com/apache/flink/blob/94b55d1ae61257f21c7bb511660e7497f269abc7/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java> exists in the Flink repo along with interfaces for the formats, but these changes are specific to Kafka. This solution could require migrating formats to the Flink-connector-kafka repo which is a decent amount of work. Feedback is appreciated! Thanks Balint