Hi Aljoscha, Sorry for the late reply, I think the solution makes sense. Using the NULL return value to mark a message is corrupted is not a valid way since NULL value has semantic meaning in not just Kafka but also in a lot of other contexts.
I was wondering if we can have a more meaningful interface for dealing with corrupted messages. I am thinking of 2 options on top of my head: 1. Create some special deserializer attribute (or a special record) to indicate corrupted messages like you suggested; this way we can not only encode the deserializing error but allow users to encode any corruption information for downstream processing. 2. Create a standard fetch error handling API on AbstractFetcher (for Kafka) and DataFetcher (for Kinesis); this way we can also handle error's other than deserializing problem, for example some even lower level exceptions like CRC check failure. I think either way will work. Also, as long as there's a way for end users to extend the error handling for message corruption, it will not reintroduce the problems these 2 original JIRA was trying to address. -- Rong On Tue, Jun 18, 2019 at 2:06 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi All, > > Thanks to Gary, I recently came upon an interesting cluster of issues: > - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka consumer > to skip corrupted messages > - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible > error handling in the Kafka consumer > - https://issues.apache.org/jira/browse/FLINK-11820: SimpleStringSchema > handle message record which value is null > > In light of the last one I’d like to look again at the first two. What > they introduced is that when the deserialisation schema returns NULL, the > Kafka consumer (and maybe also the Kinesis consumer) silently drops the > record. In Kafka NULL values have semantic meaning, i.e. they usually > encode a DELETE for the key of the message. If SimpleStringSchema returned > that null, our consumer would silently drop it and we would lose that > DELETE message. That doesn’t seem right. > > I think the right solution for null handling is to introduce a custom > record type that encodes both Kafka NULL values and the possibility of a > corrupt message that cannot be deserialised. Something like an Either type. > It’s then up to the application to handle those cases. > > Concretely, I want to discuss whether we should change our consumers to > not silently drop null records, but instead see them as errors. For > FLINK-11820, the solution is for users to write their own custom schema > that handles null values and returns a user-defined types that signals null > values. > > What do you think? > > Aljoscha > >