Re: Ignoring invalid values in KafkaSerializationSchema

2020-09-24 Thread Alexey Trenikhun
invalid values in KafkaSerializationSchema Hi Arvid, Thanks for the response: The topic is not log compacted and these invalid values are not actually tombstones, I wouldn't want anyone to misinterpret them as such. Regarding filtering the rows in a separate flatMap, that's a great

Re: Ignoring invalid values in KafkaSerializationSchema

2020-09-24 Thread Yuval Itzchakov
Hi Arvid, Thanks for the response: The topic is not log compacted and these invalid values are not actually tombstones, I wouldn't want anyone to misinterpret them as such. Regarding filtering the rows in a separate flatMap, that's a great idea. Only problem is that the rows are opaque from the

Re: Ignoring invalid values in KafkaSerializationSchema

2020-09-24 Thread Arvid Heise
Hi Yuval, Here are some workarounds. One option is to use a tombstone record (0 byte payload) and filter it downstream. If it's log-compacted, Kafka would filter them on compaction. Second option is to actually translate the Row to a byte[] array in a separate flatMap (returning 0 records on err

Re: Ignoring invalid values in KafkaSerializationSchema

2020-09-24 Thread Matthias Pohl
Hi Yuval, thanks for bringing this issue up. You're right: There is no error handling currently implemented for SerializationSchema. FLIP-124 [1] addressed this for the DeserializationSchema, though. I created FLINK-19397 [2] to cover this feature. In the meantime, I cannot think of any other solu

Ignoring invalid values in KafkaSerializationSchema

2020-09-23 Thread Yuval Itzchakov
Hi, I'm using a custom KafkaSerializationSchema to write records to Kafka using FlinkKafkaProducer. The objects written are Rows coming from Flink's SQL API. In some cases, when trying to convert the Row object to a byte[], serialization will fail due to malformed values. In such cases, I would l