Hi Aljoscha,

Sorry for the late reply, I think the solution makes sense. Using the NULL
return value to mark a message is corrupted is not a valid way since NULL
value has semantic meaning in not just Kafka but also in a lot of other
contexts.

I was wondering if we can have a more meaningful interface for dealing with
corrupted messages. I am thinking of 2 options on top of my head:
1. Create some special deserializer attribute (or a special record) to
indicate corrupted messages like you suggested; this way we can not only
encode the deserializing error but allow users to encode any corruption
information for downstream processing.
2. Create a standard fetch error handling API on AbstractFetcher (for
Kafka) and DataFetcher (for Kinesis); this way we can also handle error's
other than deserializing problem, for example some even lower level
exceptions like CRC check failure.

I think either way will work. Also, as long as there's a way for end users
to extend the error handling for message corruption, it will not
reintroduce the problems these 2 original JIRA was trying to address.

--
Rong

On Tue, Jun 18, 2019 at 2:06 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi All,
>
> Thanks to Gary, I recently came upon an interesting cluster of issues:
>  - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka consumer
> to skip corrupted messages
>  - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible
> error handling in the Kafka consumer
>  - https://issues.apache.org/jira/browse/FLINK-11820: SimpleStringSchema
> handle message record which value is null
>
> In light of the last one I’d like to look again at the first two. What
> they introduced is that when the deserialisation schema returns NULL, the
> Kafka consumer (and maybe also the Kinesis consumer) silently drops the
> record. In Kafka NULL values have semantic meaning, i.e. they usually
> encode a DELETE for the key of the message. If SimpleStringSchema returned
> that null, our consumer would silently drop it and we would lose that
> DELETE message. That doesn’t seem right.
>
> I think the right solution for null handling is to introduce a custom
> record type that encodes both Kafka NULL values and the possibility of a
> corrupt message that cannot be deserialised. Something like an Either type.
> It’s then up to the application to handle those cases.
>
> Concretely, I want to discuss whether we should change our consumers to
> not silently drop null records, but instead see them as errors. For
> FLINK-11820, the solution is for users to write their own custom schema
> that handles null values and returns a user-defined types that signals null
> values.
>
> What do you think?
>
> Aljoscha
>
>

Reply via email to