Hi Ben,
if I remember correctly, this topic came up a couple of times. But we
haven't implemented it yet, the existing implementation can be easily
adapted for that. The "target topic" would be an additional persisted
metadata column in SQL terms. All you need to do is to adapt
DynamicKafkaSerializationSchema
KafkaDynamicSink
for that.
I opened https://issues.apache.org/jira/browse/FLINK-22748 to discuss
this further.
I hope this helps.
Regards,
Timo
On 20.05.21 12:43, Benoît Paris wrote:
Hi all!
I'm looking for a way to write to different Kafka topics based on some
column value in SQL.
I think it's possible with Java, using KafkaSerializationSchema,
and ProducerRecord(topic, ...), but I was wondering if I could somewhat
access that feature in SQL.
I'm also trying to evaluate the amount of work required so that I
implement it myself, subclassing the Kafka SQL connector just to add
that feature.
Another alternative for me is to try to preprocess the SQL, detect Kafka
Sinks, force a DataStream conversion, then replace the Kafka SQL sink
with an equivalent DataStream that has the topic routing. (but this
feels rather brittle and maintenance-hard to me, rather than having the
option in the SQL sink configuration)
All comments/opinions/advice welcome!
Cheers
Ben
||