You could forward tombstone records to “graveyard” topic, this way they will 
not confuse anyone reading from regular topic


________________________________
From: Yuval Itzchakov <yuva...@gmail.com>
Sent: Thursday, September 24, 2020 11:50:28 AM
To: Arvid Heise <ar...@ververica.com>
Cc: Matthias Pohl <matth...@ververica.com>; user <user@flink.apache.org>
Subject: Re: Ignoring invalid values in KafkaSerializationSchema

Hi Arvid,

Thanks for the response:

The topic is not log compacted and these invalid values are not actually 
tombstones, I wouldn't want anyone to misinterpret them as such.

Regarding filtering the rows in a separate flatMap, that's a great idea. Only 
problem is that the rows are opaque from the perspective of the stream 
(literally could be any SQL Table), traversing them via the RowTypeInfo can end 
up being expensive.

Wrapping the KafkaProducer is the path I thought about. Ideally though, I think 
exception handling should be part of the SerializationSchema contract as things 
may fail eventually, and having a clear way to opt-out such values would be 
great.



On Thu, Sep 24, 2020, 18:44 Arvid Heise 
<ar...@ververica.com<mailto:ar...@ververica.com>> wrote:
Hi Yuval,

Here are some workarounds.

One option is to use a tombstone record (0 byte payload) and filter it 
downstream. If it's log-compacted, Kafka would filter them on compaction.

Second option is to actually translate the Row to a byte[] array in a separate 
flatMap (returning 0 records on error) and then simply write the byte[] 
directly to Kafka in the Schema.

Third option is to wrap the sink or KafkaProducer and catch the exception 
(possibly using a custom exception for clarity).

On Thu, Sep 24, 2020 at 3:00 PM Matthias Pohl 
<matth...@ververica.com<mailto:matth...@ververica.com>> wrote:
Hi Yuval,
thanks for bringing this issue up. You're right: There is no error handling 
currently implemented for SerializationSchema. FLIP-124 [1] addressed this for 
the DeserializationSchema, though. I created FLINK-19397 [2] to cover this 
feature.

In the meantime, I cannot think of any other solution than filtering those rows 
out in a step before emitting the data to Kafka.

Best,
Matthias

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
[2] https://issues.apache.org/jira/browse/FLINK-19397

On Wed, Sep 23, 2020 at 1:12 PM Yuval Itzchakov 
<yuva...@gmail.com<mailto:yuva...@gmail.com>> wrote:
Hi,

I'm using a custom KafkaSerializationSchema to write records to Kafka using 
FlinkKafkaProducer. The objects written are Rows coming from Flink's SQL API.

In some cases, when trying to convert the Row object to a byte[], serialization 
will fail due to malformed values. In such cases, I would like the custom 
serialization schema to drop the bad records and not send them through.

>From the API, it is unclear how such failures should be handled. Given the 
>following signature:

 ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);

>From reading the code, there's no exception handling or null checking, which 
>means that:

- If an exception is thrown, it will cause the entire job to fail (this has 
happened to me in production)
- If null is passed, a null value will be pushed down to kafkaProducer.send 
which is undesirable.

What are the options here?



--
Best Regards,
Yuval Itzchakov.



--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng

Reply via email to