Hi! Is there a way to skip/discard messages when using the KafkaSink, so
that if for some reason messages are malformed they can simply be
discarded? I tried by returning null in the corresponding KafkaWriter but
that raises an exception:

```
java.lang.NullPointerException
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
at
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
```

What would be the way to handle this?

On the other hand, that seems a bit asymmetric in the sense that when
reading messages, if the deserializer returns null, then that message is
simply ignored, see, e.g., from
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
:

```
T deserialize(String topic,
              byte[] data)
Deserialize a record value from a byte array into a value or object.
Parameters:
topic - topic associated with the data
data - serialized bytes; may be null; implementations are recommended to
handle null by returning a value or null rather than throwing an exception.
Returns:
deserialized typed data; may be null
```

Reply via email to