RE: Skip malformed messages with the KafkaSink

2022-10-24 Thread Salva Alcántara
I finally created this ticket: https://issues.apache.org/jira/browse/FLINK-29480. Can anyone provide any feedback? Regards, Salva On 2022/09/08 10:48:07 Salva Alcántara wrote: > Hi! Is there a way to skip/discard messages when using the KafkaSink, so > that if for some reason messages are malfo

Re: RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
Yeah, that would be an option. but it would be just nicer if I could simply skip events which fail to be serialized without prepending any operator to the sink since, conceptually, that is not really part of the pipeline but more about handling serialization errors. If I'm not mistaken, what I'm a

Re: RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Alexander Fedulov
Can't you add a flatMap function just before the Sink that does exactly this verification and filters out everything that is not supposed to be sent downstream? Best, Alexander Fedulov On Thu, Sep 8, 2022 at 6:15 PM Salva Alcántara wrote: > Sorry I meant do nothing when the serialize method ret

RE: RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
Sorry I meant do nothing when the serialize method returns null... On 2022/09/08 15:52:48 Salva Alcántara wrote: > I guess one possibility would be to extend/override the `write` method of > the KafkaWriter: > > https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-co

RE: Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
I guess one possibility would be to extend/override the `write` method of the KafkaWriter: https://github.com/apache/flink/blob/26eeabfdd1f5f976ed1b5d761a3469bbcb7d3223/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197 ``` @Overri

Skip malformed messages with the KafkaSink

2022-09-08 Thread Salva Alcántara
Hi! Is there a way to skip/discard messages when using the KafkaSink, so that if for some reason messages are malformed they can simply be discarded? I tried by returning null in the corresponding KafkaWriter but that raises an exception: ``` java.lang.NullPointerException at org.apache.kafka.clie