I guess one possibility would be to extend/override the `write` method of the KafkaWriter:
https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197 ``` @Override public void write(IN element, Context context) throws IOException { final ProducerRecord<byte[], byte[]> record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); currentProducer.send(record, deliveryCallback); numRecordsSendCounter.inc(); } ``` so that it does nothing when the IN element is null. Would this be the only way, really? On 2022/09/08 10:48:07 Salva Alcántara wrote: > Hi! Is there a way to skip/discard messages when using the KafkaSink, so > that if for some reason messages are malformed they can simply be > discarded? I tried by returning null in the corresponding KafkaWriter but > that raises an exception: > > ``` > java.lang.NullPointerException > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > at > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > ``` > > What would be the way to handle this? > > On the other hand, that seems a bit asymmetric in the sense that when > reading messages, if the deserializer returns null, then that message is > simply ignored, see, e.g., from > https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html > : > > ``` > T deserialize(String topic, > byte[] data) > Deserialize a record value from a byte array into a value or object. > Parameters: > topic - topic associated with the data > data - serialized bytes; may be null; implementations are recommended to > handle null by returning a value or null rather than throwing an exception. > Returns: > deserialized typed data; may be null > ``` >