invalid values in KafkaSerializationSchema
Hi Arvid,
Thanks for the response:
The topic is not log compacted and these invalid values are not actually
tombstones, I wouldn't want anyone to misinterpret them as such.
Regarding filtering the rows in a separate flatMap, that's a great
Hi Arvid,
Thanks for the response:
The topic is not log compacted and these invalid values are not actually
tombstones, I wouldn't want anyone to misinterpret them as such.
Regarding filtering the rows in a separate flatMap, that's a great idea.
Only problem is that the rows are opaque from the
Hi Yuval,
Here are some workarounds.
One option is to use a tombstone record (0 byte payload) and filter it
downstream. If it's log-compacted, Kafka would filter them on compaction.
Second option is to actually translate the Row to a byte[] array in a
separate flatMap (returning 0 records on err
Hi Yuval,
thanks for bringing this issue up. You're right: There is no error handling
currently implemented for SerializationSchema. FLIP-124 [1] addressed this
for the DeserializationSchema, though. I created FLINK-19397 [2] to cover
this feature.
In the meantime, I cannot think of any other solu
Hi,
I'm using a custom KafkaSerializationSchema to write records to Kafka using
FlinkKafkaProducer. The objects written are Rows coming from Flink's SQL
API.
In some cases, when trying to convert the Row object to a byte[],
serialization will fail due to malformed values. In such cases, I would
l