I guess one possibility would be to extend/override the `write` method of
the KafkaWriter:

https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197

```
    @Override
    public void write(IN element, Context context) throws IOException {
        final ProducerRecord<byte[], byte[]> record =
                recordSerializer.serialize(element, kafkaSinkContext,
context.timestamp());
        currentProducer.send(record, deliveryCallback);
        numRecordsSendCounter.inc();
    }
```

so that it does nothing when the IN element is null. Would this be the only
way, really?

On 2022/09/08 10:48:07 Salva Alcántara wrote:
> Hi! Is there a way to skip/discard messages when using the KafkaSink, so
> that if for some reason messages are malformed they can simply be
> discarded? I tried by returning null in the corresponding KafkaWriter but
> that raises an exception:
>
> ```
> java.lang.NullPointerException
> at
>
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> at
>
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> at
>
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> at
>
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> ```
>
> What would be the way to handle this?
>
> On the other hand, that seems a bit asymmetric in the sense that when
> reading messages, if the deserializer returns null, then that message is
> simply ignored, see, e.g., from
>
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
> :
>
> ```
> T deserialize(String topic,
>               byte[] data)
> Deserialize a record value from a byte array into a value or object.
> Parameters:
> topic - topic associated with the data
> data - serialized bytes; may be null; implementations are recommended to
> handle null by returning a value or null rather than throwing an
exception.
> Returns:
> deserialized typed data; may be null
> ```
>

Reply via email to