I finally created this ticket:
https://issues.apache.org/jira/browse/FLINK-29480.

Can anyone provide any feedback?

Regards,

Salva

On 2022/09/08 10:48:07 Salva Alcántara wrote:
> Hi! Is there a way to skip/discard messages when using the KafkaSink, so
> that if for some reason messages are malformed they can simply be
> discarded? I tried by returning null in the corresponding KafkaWriter but
> that raises an exception:
>
> ```
> java.lang.NullPointerException
> at
>
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
> at
>
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> at
>
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200)
> at
>
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> ```
>
> What would be the way to handle this?
>
> On the other hand, that seems a bit asymmetric in the sense that when
> reading messages, if the deserializer returns null, then that message is
> simply ignored, see, e.g., from
>
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html
> :
>
> ```
> T deserialize(String topic,
>               byte[] data)
> Deserialize a record value from a byte array into a value or object.
> Parameters:
> topic - topic associated with the data
> data - serialized bytes; may be null; implementations are recommended to
> handle null by returning a value or null rather than throwing an
exception.
> Returns:
> deserialized typed data; may be null
> ```
>

Reply via email to