I think actually most of Flink was not designed to handle NULL values and as 
far as I remember, some people think that Flink shouldn’t handle NULL values. 
The fact that some parts support NULL values is more by accident than by 
conscious planning.

Aljoscha

> On 24. Jun 2019, at 10:07, Becket Qin <becket....@gmail.com> wrote:
> 
> Hi Aljoscha,
> 
> Thanks for raising the issue. It seems there are two issues here: 1) The
> null value handling, and 2) The error handling.
> 
> For null value handling, my understanding is the following:
>  - Null values could have a realistic meaning in some systems. So Flink
> needs to support them.
>  - By design, in Flink, the records passed between Flink operators have
> already supported null values. They are wrapped in StreamRecord.
>  - Some user facing APIs, however, seem not fully support null values.
> e.g. the Collector.
>  - The connector code are sort of "user code" from Flink's perspective. So
> each connector should decide how null value should be treated.
> If we want to support null values in Flink everywhere, we may need to look
> into those user facing APIs that do not take null values. Wrapping the user
> returned value looks reasonable, ideally the wrapper class should also be
> StreamRecord so it is consistent with what we have for those records passed
> between operators.
> 
> WRT error handling, I agree with Xiaowei that the error handling mechanism
> should be something generic to the entire project instead of just for
> connectors. This reminds of another discussion thread which proposes to add
> a pluggable to categorize and report exceptions causing job failure [1]. It
> might worth thinking to see whether it makes sense to design the error
> handling and reporting as a whole.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> [1]
> https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing
> 
> 
> 
> 
> On Sun, Jun 23, 2019 at 9:40 PM Xiaowei Jiang <xiaow...@gmail.com> wrote:
> 
>> Error handling policy for streaming jobs goes beyond potential corrupted
>> messages in the source. Users may have subtle bugs while processing some
>> messages which may cause the streaming jobs to fail. Even though this can
>> be considered as a bug in user's code, users may prefer skip such messages
>> (or log them) and let the job continue in some cases. This may be an
>> opportunity to take such cases into consideration as well.
>> 
>> Xiaowei
>> 
>> On Fri, Jun 21, 2019 at 11:43 PM Rong Rong <walter...@gmail.com> wrote:
>> 
>>> Hi Aljoscha,
>>> 
>>> Sorry for the late reply, I think the solution makes sense. Using the NULL
>>> return value to mark a message is corrupted is not a valid way since NULL
>>> value has semantic meaning in not just Kafka but also in a lot of other
>>> contexts.
>>> 
>>> I was wondering if we can have a more meaningful interface for dealing
>>> with
>>> corrupted messages. I am thinking of 2 options on top of my head:
>>> 1. Create some special deserializer attribute (or a special record) to
>>> indicate corrupted messages like you suggested; this way we can not only
>>> encode the deserializing error but allow users to encode any corruption
>>> information for downstream processing.
>>> 2. Create a standard fetch error handling API on AbstractFetcher (for
>>> Kafka) and DataFetcher (for Kinesis); this way we can also handle error's
>>> other than deserializing problem, for example some even lower level
>>> exceptions like CRC check failure.
>>> 
>>> I think either way will work. Also, as long as there's a way for end users
>>> to extend the error handling for message corruption, it will not
>>> reintroduce the problems these 2 original JIRA was trying to address.
>>> 
>>> --
>>> Rong
>>> 
>>> On Tue, Jun 18, 2019 at 2:06 AM Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>> 
>>>> Hi All,
>>>> 
>>>> Thanks to Gary, I recently came upon an interesting cluster of issues:
>>>> - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka
>>> consumer
>>>> to skip corrupted messages
>>>> - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible
>>>> error handling in the Kafka consumer
>>>> - https://issues.apache.org/jira/browse/FLINK-11820:
>>> SimpleStringSchema
>>>> handle message record which value is null
>>>> 
>>>> In light of the last one I’d like to look again at the first two. What
>>>> they introduced is that when the deserialisation schema returns NULL,
>>> the
>>>> Kafka consumer (and maybe also the Kinesis consumer) silently drops the
>>>> record. In Kafka NULL values have semantic meaning, i.e. they usually
>>>> encode a DELETE for the key of the message. If SimpleStringSchema
>>> returned
>>>> that null, our consumer would silently drop it and we would lose that
>>>> DELETE message. That doesn’t seem right.
>>>> 
>>>> I think the right solution for null handling is to introduce a custom
>>>> record type that encodes both Kafka NULL values and the possibility of a
>>>> corrupt message that cannot be deserialised. Something like an Either
>>> type.
>>>> It’s then up to the application to handle those cases.
>>>> 
>>>> Concretely, I want to discuss whether we should change our consumers to
>>>> not silently drop null records, but instead see them as errors. For
>>>> FLINK-11820, the solution is for users to write their own custom schema
>>>> that handles null values and returns a user-defined types that signals
>>> null
>>>> values.
>>>> 
>>>> What do you think?
>>>> 
>>>> Aljoscha
>>>> 
>>>> 
>>> 
>> 

Reply via email to