Hello,

When using FlinkKafkaConsumer011 with JSONKeyValueDeserializationSchema, if an invalid, non-parsable message is sent to the Kafka topic, the consumer expectedly fails with JsonParseException. So far so good, but this leads to the following loop: the job switches to FAILED then attempts to restart and fails again, and so on. That is, the parsing error leads to the Kafka message not being committed, hence it keeps being received. 
Since the JsonParseException can't be catched in application code, what would be the recommended way to handle the case of possibly non-parseable Kafka messages?
 
Is there is a way to configure the Flink Kafka consumer to treat the case of non-parseable messages by logging the parsing error then committing the message such that the processing can continue? Is there isn't, would such an enhancement make sense?

Unless there is a better solution, it looks as a requirement to guarantee that FlinkKafkaConsumer011 only receives valid messages, which can be annoying in practice.

For reference, here's the stack of the JsonParseException in the log:

Source: Custom Source(1/1) switched to FAILED
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): Expected space separating root-level values
at [Source: UNKNOWN; line: 1, column: 3]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890)
at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55)
at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac.

Thanks,
Adrian
Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
Compagnie IBM France
Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
RCS Nanterre 552 118 465
Forme Sociale : S.A.S.
Capital Social : 657.364.587 €
SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A

Reply via email to