Hi Adrian,
couldn't you solve this by providing your own DeserializationSchema [1],
possibly extending from JSONKeyValueDeserializationSchema and catching
the error there?


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema

On 12/01/18 18:26, Adrian Vasiliu wrote:
> Hello,
> 
> When using FlinkKafkaConsumer011 with JSONKeyValueDeserializationSchema,
> if an invalid, non-parsable message is sent to the Kafka topic, the
> consumer expectedly fails with JsonParseException. So far so good, but
> this leads to the following loop: the job switches to FAILED
> then attempts to restart and fails again, and so on. That is, the
> parsing error leads to the Kafka message not being committed, hence it
> keeps being received. 
> Since the JsonParseException can't be catched in application code, what
> would be the recommended way to handle the case of possibly
> non-parseable Kafka messages?
>  
> Is there is a way to configure the Flink Kafka consumer to treat the
> case of non-parseable messages by logging the parsing error then
> committing the message such that the processing can continue? Is there
> isn't, would such an enhancement make sense?
> 
> Unless there is a better solution, it looks as a requirement to
> guarantee that FlinkKafkaConsumer011 only receives valid messages, which
> can be annoying in practice.
> 
> For reference, here's the stack of the JsonParseException in the log:
> 
> Source: Custom Source(1/1) switched to FAILED
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
> Unexpected character (':' (code 58)): Expected space separating
> root-level values
> at [Source: UNKNOWN; line: 1, column: 3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890)
> at
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55)
> at
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> 
> My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac.
> 
> Thanks,
> Adrian
> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
> Compagnie IBM France
> Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
> RCS Nanterre 552 118 465
> Forme Sociale : S.A.S.
> Capital Social : 657.364.587 €
> SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to