I guess one possibility would be to extend/override the `write` method of
the KafkaWriter:


    public void write(IN element, Context context) throws IOException {
        final ProducerRecord<byte[], byte[]> record =
                recordSerializer.serialize(element, kafkaSinkContext,
        currentProducer.send(record, deliveryCallback);

so that it does nothing when the IN element is null. Would this be the only
way, really?

On 2022/09/08 10:48:07 Salva Alcántara wrote:
> Hi! Is there a way to skip/discard messages when using the KafkaSink, so
> that if for some reason messages are malformed they can simply be
> discarded? I tried by returning null in the corresponding KafkaWriter but
> that raises an exception:
> ```
> java.lang.NullPointerException
> at
> at
> at
> at
> ```
> What would be the way to handle this?
> On the other hand, that seems a bit asymmetric in the sense that when
> reading messages, if the deserializer returns null, then that message is
> simply ignored, see, e.g., from
> :
> ```
> T deserialize(String topic,
>               byte[] data)
> Deserialize a record value from a byte array into a value or object.
> Parameters:
> topic - topic associated with the data
> data - serialized bytes; may be null; implementations are recommended to
> handle null by returning a value or null rather than throwing an
> Returns:
> deserialized typed data; may be null
> ```

Reply via email to