Hi all, I have a custom deserializer which I pass to a Kafka source to transform JSON string to Scala case class.
val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event], new Event), kafkaProp)) There are time when the JSON message is malformed, in which case I want to catch the exception, log some error message, and go on to the next message without producing an event to the downstream. It doesn't seem like the DeserializationSchema interface allows such behavior. How could I achieve this? Thanks, Jack