Hi Adrian, couldn't you solve this by providing your own DeserializationSchema [1], possibly extending from JSONKeyValueDeserializationSchema and catching the error there?
Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema On 12/01/18 18:26, Adrian Vasiliu wrote: > Hello, > > When using FlinkKafkaConsumer011 with JSONKeyValueDeserializationSchema, > if an invalid, non-parsable message is sent to the Kafka topic, the > consumer expectedly fails with JsonParseException. So far so good, but > this leads to the following loop: the job switches to FAILED > then attempts to restart and fails again, and so on. That is, the > parsing error leads to the Kafka message not being committed, hence it > keeps being received. > Since the JsonParseException can't be catched in application code, what > would be the recommended way to handle the case of possibly > non-parseable Kafka messages? > > Is there is a way to configure the Flink Kafka consumer to treat the > case of non-parseable messages by logging the parsing error then > committing the message such that the processing can continue? Is there > isn't, would such an enhancement make sense? > > Unless there is a better solution, it looks as a requirement to > guarantee that FlinkKafkaConsumer011 only receives valid messages, which > can be annoying in practice. > > For reference, here's the stack of the JsonParseException in the log: > > Source: Custom Source(1/1) switched to FAILED > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: > Unexpected character (':' (code 58)): Expected space separating > root-level values > at [Source: UNKNOWN; line: 1, column: 3] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890) > at > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55) > at > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > > My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac. > > Thanks, > Adrian > Sauf indication contraire ci-dessus:/ Unless stated otherwise above: > Compagnie IBM France > Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex > RCS Nanterre 552 118 465 > Forme Sociale : S.A.S. > Capital Social : 657.364.587 € > SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A
signature.asc
Description: OpenPGP digital signature