Sorry I meant do nothing when the serialize method returns null...
On 2022/09/08 15:52:48 Salva Alcántara wrote: > I guess one possibility would be to extend/override the `write` method of > the KafkaWriter: > > https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197 > > ``` > @Override > public void write(IN element, Context context) throws IOException { > final ProducerRecord<byte[], byte[]> record = > recordSerializer.serialize(element, kafkaSinkContext, > context.timestamp()); > currentProducer.send(record, deliveryCallback); > numRecordsSendCounter.inc(); > } > ``` > > so that it does nothing when the IN element is null. Would this be the only > way, really? > > On 2022/09/08 10:48:07 Salva Alcántara wrote: > > Hi! Is there a way to skip/discard messages when using the KafkaSink, so > > that if for some reason messages are malformed they can simply be > > discarded? I tried by returning null in the corresponding KafkaWriter but > > that raises an exception: > > > > ``` > > java.lang.NullPointerException > > at > > > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) > > at > > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > > at > > > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) > > at > > > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > > ``` > > > > What would be the way to handle this? > > > > On the other hand, that seems a bit asymmetric in the sense that when > > reading messages, if the deserializer returns null, then that message is > > simply ignored, see, e.g., from > > > https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html > > : > > > > ``` > > T deserialize(String topic, > > byte[] data) > > Deserialize a record value from a byte array into a value or object. > > Parameters: > > topic - topic associated with the data > > data - serialized bytes; may be null; implementations are recommended to > > handle null by returning a value or null rather than throwing an > exception. > > Returns: > > deserialized typed data; may be null > > ``` > > >