I think actually most of Flink was not designed to handle NULL values and as far as I remember, some people think that Flink shouldn’t handle NULL values. The fact that some parts support NULL values is more by accident than by conscious planning.
Aljoscha > On 24. Jun 2019, at 10:07, Becket Qin <becket....@gmail.com> wrote: > > Hi Aljoscha, > > Thanks for raising the issue. It seems there are two issues here: 1) The > null value handling, and 2) The error handling. > > For null value handling, my understanding is the following: > - Null values could have a realistic meaning in some systems. So Flink > needs to support them. > - By design, in Flink, the records passed between Flink operators have > already supported null values. They are wrapped in StreamRecord. > - Some user facing APIs, however, seem not fully support null values. > e.g. the Collector. > - The connector code are sort of "user code" from Flink's perspective. So > each connector should decide how null value should be treated. > If we want to support null values in Flink everywhere, we may need to look > into those user facing APIs that do not take null values. Wrapping the user > returned value looks reasonable, ideally the wrapper class should also be > StreamRecord so it is consistent with what we have for those records passed > between operators. > > WRT error handling, I agree with Xiaowei that the error handling mechanism > should be something generic to the entire project instead of just for > connectors. This reminds of another discussion thread which proposes to add > a pluggable to categorize and report exceptions causing job failure [1]. It > might worth thinking to see whether it makes sense to design the error > handling and reporting as a whole. > > Thanks, > > Jiangjie (Becket) Qin > > [1] > https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing > > > > > On Sun, Jun 23, 2019 at 9:40 PM Xiaowei Jiang <xiaow...@gmail.com> wrote: > >> Error handling policy for streaming jobs goes beyond potential corrupted >> messages in the source. Users may have subtle bugs while processing some >> messages which may cause the streaming jobs to fail. Even though this can >> be considered as a bug in user's code, users may prefer skip such messages >> (or log them) and let the job continue in some cases. This may be an >> opportunity to take such cases into consideration as well. >> >> Xiaowei >> >> On Fri, Jun 21, 2019 at 11:43 PM Rong Rong <walter...@gmail.com> wrote: >> >>> Hi Aljoscha, >>> >>> Sorry for the late reply, I think the solution makes sense. Using the NULL >>> return value to mark a message is corrupted is not a valid way since NULL >>> value has semantic meaning in not just Kafka but also in a lot of other >>> contexts. >>> >>> I was wondering if we can have a more meaningful interface for dealing >>> with >>> corrupted messages. I am thinking of 2 options on top of my head: >>> 1. Create some special deserializer attribute (or a special record) to >>> indicate corrupted messages like you suggested; this way we can not only >>> encode the deserializing error but allow users to encode any corruption >>> information for downstream processing. >>> 2. Create a standard fetch error handling API on AbstractFetcher (for >>> Kafka) and DataFetcher (for Kinesis); this way we can also handle error's >>> other than deserializing problem, for example some even lower level >>> exceptions like CRC check failure. >>> >>> I think either way will work. Also, as long as there's a way for end users >>> to extend the error handling for message corruption, it will not >>> reintroduce the problems these 2 original JIRA was trying to address. >>> >>> -- >>> Rong >>> >>> On Tue, Jun 18, 2019 at 2:06 AM Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi All, >>>> >>>> Thanks to Gary, I recently came upon an interesting cluster of issues: >>>> - https://issues.apache.org/jira/browse/FLINK-3679: Allow Kafka >>> consumer >>>> to skip corrupted messages >>>> - https://issues.apache.org/jira/browse/FLINK-5583: Support flexible >>>> error handling in the Kafka consumer >>>> - https://issues.apache.org/jira/browse/FLINK-11820: >>> SimpleStringSchema >>>> handle message record which value is null >>>> >>>> In light of the last one I’d like to look again at the first two. What >>>> they introduced is that when the deserialisation schema returns NULL, >>> the >>>> Kafka consumer (and maybe also the Kinesis consumer) silently drops the >>>> record. In Kafka NULL values have semantic meaning, i.e. they usually >>>> encode a DELETE for the key of the message. If SimpleStringSchema >>> returned >>>> that null, our consumer would silently drop it and we would lose that >>>> DELETE message. That doesn’t seem right. >>>> >>>> I think the right solution for null handling is to introduce a custom >>>> record type that encodes both Kafka NULL values and the possibility of a >>>> corrupt message that cannot be deserialised. Something like an Either >>> type. >>>> It’s then up to the application to handle those cases. >>>> >>>> Concretely, I want to discuss whether we should change our consumers to >>>> not silently drop null records, but instead see them as errors. For >>>> FLINK-11820, the solution is for users to write their own custom schema >>>> that handles null values and returns a user-defined types that signals >>> null >>>> values. >>>> >>>> What do you think? >>>> >>>> Aljoscha >>>> >>>> >>> >>