Hi All, Thanks to Gary, I recently came upon an interesting cluster of issues: - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka consumer to skip corrupted messages - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible error handling in the Kafka consumer - https://issues.apache.org/jira/browse/FLINK-11820: SimpleStringSchema handle message record which value is null
In light of the last one I’d like to look again at the first two. What they introduced is that when the deserialisation schema returns NULL, the Kafka consumer (and maybe also the Kinesis consumer) silently drops the record. In Kafka NULL values have semantic meaning, i.e. they usually encode a DELETE for the key of the message. If SimpleStringSchema returned that null, our consumer would silently drop it and we would lose that DELETE message. That doesn’t seem right. I think the right solution for null handling is to introduce a custom record type that encodes both Kafka NULL values and the possibility of a corrupt message that cannot be deserialised. Something like an Either type. It’s then up to the application to handle those cases. Concretely, I want to discuss whether we should change our consumers to not silently drop null records, but instead see them as errors. For FLINK-11820, the solution is for users to write their own custom schema that handles null values and returns a user-defined types that signals null values. What do you think? Aljoscha