Hi Users,

Is there a way I can do a schema validation on read from Kafka in a Flink
job.

I have a pipeline like this

Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic
Transformed(avro data) -> Sink

While reading from Raw topic I wanted to validate the schema so that in
case the schema check fails I can push the event to an error topic. I
understand from documentation[1] that the events which cannot be
deserialised will be returned as null and consumer moves ahead(failing the
consumer does not help as this could be re-tried with same result).
Is there a way I can filter these records if the events cannot be
deserialised .

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
'When encountering a corrupted message that cannot be deserialised for any
reason, there are two options - either throwing an exception from the
deserialize(...) method which will cause the job to fail and be restarted,
or returning null to allow the Flink Kafka consumer to silently skip the
corrupted message.'

Thanks,
Hemant

Reply via email to