Nice, I didn't even read that far myself :P -> turns out the API was prepared for that after all
I'm not sure about a default option for handling/skipping corrupted messages since the handling of those is probably highly use-case specific. If you nonetheless feel that this should be in there, feel free to open an improvement request in our issue tracker at https://issues.apache.org/jira/browse/FLINK Nico On 16/01/18 13:35, Adrian Vasiliu wrote: > Hi Nico, > Thanks a lot. I did consider that, but I've missed the clarification of > the contract brought by the piece a doc you > pointed: "returning |null| to allow the Flink Kafka consumer to silently > skip the corrupted message". > I suppose it could be an improvement > for JSONKeyValueDeserializationSchema to provide this behaviour as an > out-of-the-box option. But anyway, I do have a solution in hands. > Thanks again. > Adrian > > > ----- Original message ----- > From: Nico Kruber <n...@data-artisans.com> > To: Adrian Vasiliu <vasi...@fr.ibm.com>, user@flink.apache.org > Cc: > Subject: Re: Unrecoverable job failure after Json parse error? > Date: Tue, Jan 16, 2018 11:34 AM > > Hi Adrian, > couldn't you solve this by providing your own DeserializationSchema [1], > possibly extending from JSONKeyValueDeserializationSchema and catching > the error there? > > > Nico > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema > > On 12/01/18 18:26, Adrian Vasiliu wrote: > > Hello, > > > > When using FlinkKafkaConsumer011 > with JSONKeyValueDeserializationSchema, > > if an invalid, non-parsable message is sent to the Kafka topic, the > > consumer expectedly fails with JsonParseException. So far so good, but > > this leads to the following loop: the job switches to FAILED > > then attempts to restart and fails again, and so on. That is, the > > parsing error leads to the Kafka message not being committed, hence it > > keeps being received. > > Since the JsonParseException can't be catched in application code, > what > > would be the recommended way to handle the case of possibly > > non-parseable Kafka messages? > > > > Is there is a way to configure the Flink Kafka consumer to treat the > > case of non-parseable messages by logging the parsing error then > > committing the message such that the processing can continue? Is there > > isn't, would such an enhancement make sense? > > > > Unless there is a better solution, it looks as a requirement to > > guarantee that FlinkKafkaConsumer011 only receives valid messages, > which > > can be annoying in practice. > > > > For reference, here's the stack of the JsonParseException in the log: > > > > Source: Custom Source(1/1) switched to FAILED > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: > > Unexpected character (':' (code 58)): Expected space separating > > root-level values > > at [Source: UNKNOWN; line: 1, column: 3] > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450) > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466) > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657) > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394) > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852) > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748) > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) > > at > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890) > > at > > > > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55) > > at > > > > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40) > > at > > > > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140) > > at > > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641) > > at > > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) > > at > > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > > at > > > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > > at java.lang.Thread.run(Thread.java:745) > > > > My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac. > > > > Thanks, > > Adrian > > Sauf indication contraire ci-dessus:/ Unless stated otherwise above: > > Compagnie IBM France > > Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex > > RCS Nanterre 552 118 465 > > Forme Sociale : S.A.S. > > Capital Social : 657.364.587 € > > SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A > > > > > Sauf indication contraire ci-dessus:/ Unless stated otherwise above: > Compagnie IBM France > Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex > RCS Nanterre 552 118 465 > Forme Sociale : S.A.S. > Capital Social : 657.364.587 € > SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A
signature.asc
Description: OpenPGP digital signature