Yeah, that would be an option. but it would be just nicer if I could simply skip events which fail to be serialized without prepending any operator to the sink since, conceptually, that is not really part of the pipeline but more about handling serialization errors.
If I'm not mistaken, what I'm asking is entirely possible when reading/deserializing messages (without having to append a filter to discard invalid messages to be sent downstream). E.g., from the generic DeserializationSchema interface: ```` // To be overridden by the user T deserialize(byte[] message) throws IOException; @PublicEvolving default void deserialize(byte[] message, Collector<T> out) throws IOException { T deserialize = deserialize(message); if (deserialize != null) { out.collect(deserialize); } } ``` The more specialized interface KafkaDeserializationSchema is implemented identically: ``` // To be overridden by the user T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception; default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception { T deserialized = deserialize(message); if (deserialized != null) { out.collect(deserialized); } } ``` So, one can simply return `null` in the overridden `deserialize` method and those messages will be automatically filtered out. If instead one uses the more recent KafkaRecordDeserializationSchema interface, then ``` void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out) throws IOException; ``` it's possible to simply not call `out.collect` on those records that want to be skipped. In general, this gives a flexibility which is lost when writing/serializing messages, resulting in a somewhat inconsistent/asymmetric behaviour when one looks at the KafkaWriter used by the KafkaSink: ``` @Override public void write(IN element, Context context) throws IOException { final ProducerRecord<byte[], byte[]> record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); currentProducer.send(record, deliveryCallback); numRecordsSendCounter.inc(); } ``` where it's not possible to skip records if desired. On the other hand it's not currently possible to pass a custom writer to the KafkaSink with a different behaviour, e.g., ``` @Override public void write(IN element, Context context) throws IOException { final ProducerRecord<byte[], byte[]> record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); if (record != null) { // skip null records currentProducer.send(record, deliveryCallback); numRecordsSendCounter.inc(); } } ``` Isn't the above implementation more consistent with the deserializaton case (and also more powerful) than the current one? On Thu, Sep 8, 2022 at 10:56 PM Alexander Fedulov <a...@deltastream.io> wrote: > Can't you add a flatMap function just before the Sink that does exactly > this verification and filters out everything that is not supposed to be > sent downstream? > > Best, > Alexander Fedulov > > On Thu, Sep 8, 2022 at 6:15 PM Salva Alcántara <salcantara...@gmail.com> > wrote: > >> Sorry I meant do nothing when the serialize method returns null... >> >> On 2022/09/08 15:52:48 Salva Alcántara wrote: >> > I guess one possibility would be to extend/override the `write` method >> of >> > the KafkaWriter: >> > >> > >> https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197 >> > >> > ``` >> > @Override >> > public void write(IN element, Context context) throws IOException { >> > final ProducerRecord<byte[], byte[]> record = >> > recordSerializer.serialize(element, kafkaSinkContext, >> > context.timestamp()); >> > currentProducer.send(record, deliveryCallback); >> > numRecordsSendCounter.inc(); >> > } >> > ``` >> > >> > so that it does nothing when the IN element is null. Would this be the >> only >> > way, really? >> > >> > On 2022/09/08 10:48:07 Salva Alcántara wrote: >> > > Hi! Is there a way to skip/discard messages when using the KafkaSink, >> so >> > > that if for some reason messages are malformed they can simply be >> > > discarded? I tried by returning null in the corresponding KafkaWriter >> but >> > > that raises an exception: >> > > >> > > ``` >> > > java.lang.NullPointerException >> > > at >> > > >> > >> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) >> > > at >> > > >> > >> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) >> > > at >> > > >> > >> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) >> > > at >> > > >> > >> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) >> > > ``` >> > > >> > > What would be the way to handle this? >> > > >> > > On the other hand, that seems a bit asymmetric in the sense that when >> > > reading messages, if the deserializer returns null, then that message >> is >> > > simply ignored, see, e.g., from >> > > >> > >> https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html >> > > : >> > > >> > > ``` >> > > T deserialize(String topic, >> > > byte[] data) >> > > Deserialize a record value from a byte array into a value or object. >> > > Parameters: >> > > topic - topic associated with the data >> > > data - serialized bytes; may be null; implementations are recommended >> to >> > > handle null by returning a value or null rather than throwing an >> > exception. >> > > Returns: >> > > deserialized typed data; may be null >> > > ``` >> > > >> > >> >