Hi All,

Thanks to Gary, I recently came upon an interesting cluster of issues:
 - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka consumer to 
skip corrupted messages
 - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible error 
handling in the Kafka consumer
 - https://issues.apache.org/jira/browse/FLINK-11820: SimpleStringSchema handle 
message record which value is null

In light of the last one I’d like to look again at the first two. What they 
introduced is that when the deserialisation schema returns NULL, the Kafka 
consumer (and maybe also the Kinesis consumer) silently drops the record. In 
Kafka NULL values have semantic meaning, i.e. they usually encode a DELETE for 
the key of the message. If SimpleStringSchema returned that null, our consumer 
would silently drop it and we would lose that DELETE message. That doesn’t seem 
right.

I think the right solution for null handling is to introduce a custom record 
type that encodes both Kafka NULL values and the possibility of a corrupt 
message that cannot be deserialised. Something like an Either type. It’s then 
up to the application to handle those cases. 

Concretely, I want to discuss whether we should change our consumers to not 
silently drop null records, but instead see them as errors. For FLINK-11820, 
the solution is for users to write their own custom schema that handles null 
values and returns a user-defined types that signals null values.

What do you think?

Aljoscha

Reply via email to