Hi! Is there a way to skip/discard messages when using the KafkaSink, so that if for some reason messages are malformed they can simply be discarded? I tried by returning null in the corresponding KafkaWriter but that raises an exception:
``` java.lang.NullPointerException at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ``` What would be the way to handle this? On the other hand, that seems a bit asymmetric in the sense that when reading messages, if the deserializer returns null, then that message is simply ignored, see, e.g., from https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html : ``` T deserialize(String topic, byte[] data) Deserialize a record value from a byte array into a value or object. Parameters: topic - topic associated with the data data - serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception. Returns: deserialized typed data; may be null ```