Can't you add a flatMap function just before the Sink that does exactly this verification and filters out everything that is not supposed to be sent downstream?
Best, Alexander Fedulov On Thu, Sep 8, 2022 at 6:15 PM Salva Alcántara <salcantara...@gmail.com> wrote: > Sorry I meant do nothing when the serialize method returns null... > > On 2022/09/08 15:52:48 Salva Alcántara wrote: > > I guess one possibility would be to extend/override the `write` method of > > the KafkaWriter: > > > > > https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197 > > > > ``` > > @Override > > public void write(IN element, Context context) throws IOException { > > final ProducerRecord<byte[], byte[]> record = > > recordSerializer.serialize(element, kafkaSinkContext, > > context.timestamp()); > > currentProducer.send(record, deliveryCallback); > > numRecordsSendCounter.inc(); > > } > > ``` > > > > so that it does nothing when the IN element is null. Would this be the > only > > way, really? > > > > On 2022/09/08 10:48:07 Salva Alcántara wrote: > > > Hi! Is there a way to skip/discard messages when using the KafkaSink, > so > > > that if for some reason messages are malformed they can simply be > > > discarded? I tried by returning null in the corresponding KafkaWriter > but > > > that raises an exception: > > > > > > ``` > > > java.lang.NullPointerException > > > at > > > > > > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) > > > at > > > > > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > > > at > > > > > > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) > > > at > > > > > > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > > > ``` > > > > > > What would be the way to handle this? > > > > > > On the other hand, that seems a bit asymmetric in the sense that when > > > reading messages, if the deserializer returns null, then that message > is > > > simply ignored, see, e.g., from > > > > > > https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html > > > : > > > > > > ``` > > > T deserialize(String topic, > > > byte[] data) > > > Deserialize a record value from a byte array into a value or object. > > > Parameters: > > > topic - topic associated with the data > > > data - serialized bytes; may be null; implementations are recommended > to > > > handle null by returning a value or null rather than throwing an > > exception. > > > Returns: > > > deserialized typed data; may be null > > > ``` > > > > > >