[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896253#comment-15896253 ]
ASF GitHub Bot commented on FLINK-3679: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r104311996 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -381,6 +381,10 @@ else if (partitionsRemoved) { partitionsIterator.remove(); continue partitionsLoop; } + + if (value == null) { + continue; + } --- End diff -- Would it make sense to do the `null` checking inside `emitRecord(...)`? Otherwise, we wouldn't be updating the state for skipped records, and therefore not accounting it as "already processed". I don't think it really matters, since we aren't outputting anything anyway, but I see at least one minor advantage that might deserve changing it: If we fail during a series of continuous skipped records, we won't be wasting any overhead re-processing them on restore. > Allow Kafka consumer to skip corrupted messages > ----------------------------------------------- > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector > Reporter: Jamie Grier > Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)