Hi Yuval, Here are some workarounds.
One option is to use a tombstone record (0 byte payload) and filter it downstream. If it's log-compacted, Kafka would filter them on compaction. Second option is to actually translate the Row to a byte[] array in a separate flatMap (returning 0 records on error) and then simply write the byte[] directly to Kafka in the Schema. Third option is to wrap the sink or KafkaProducer and catch the exception (possibly using a custom exception for clarity). On Thu, Sep 24, 2020 at 3:00 PM Matthias Pohl <matth...@ververica.com> wrote: > Hi Yuval, > thanks for bringing this issue up. You're right: There is no error > handling currently implemented for SerializationSchema. FLIP-124 [1] > addressed this for the DeserializationSchema, though. I created > FLINK-19397 [2] to cover this feature. > > In the meantime, I cannot think of any other solution than filtering those > rows out in a step before emitting the data to Kafka. > > Best, > Matthias > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988 > [2] https://issues.apache.org/jira/browse/FLINK-19397 > > On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov <yuva...@gmail.com> wrote: > >> Hi, >> >> I'm using a custom KafkaSerializationSchema to write records to Kafka >> using FlinkKafkaProducer. The objects written are Rows coming from Flink's >> SQL API. >> >> In some cases, when trying to convert the Row object to a byte[], >> serialization will fail due to malformed values. In such cases, I would >> like the custom serialization schema to drop the bad records and not send >> them through. >> >> From the API, it is unclear how such failures should be handled. Given >> the following signature: >> >> ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long >> timestamp); >> >> From reading the code, there's no exception handling or null checking, >> which means that: >> >> - If an exception is thrown, it will cause the entire job to fail (this >> has happened to me in production) >> - If null is passed, a null value will be pushed down to >> kafkaProducer.send which is undesirable. >> >> What are the options here? >> >> >> >> -- >> Best Regards, >> Yuval Itzchakov. >> > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng