Reading from a kafka topic with custom serialization/deserialization can be done using a KafkaSource configured with an implementation of KafkaRecordDeserializationSchema, which has access even to kafka headers which are used in my case for checking message integrity. How can we do the same but using the table API where you can just configure the value.format with a string to a predefined set of formats?
Thanks.