Hi Ben,

if I remember correctly, this topic came up a couple of times. But we haven't implemented it yet, the existing implementation can be easily adapted for that. The "target topic" would be an additional persisted metadata column in SQL terms. All you need to do is to adapt

DynamicKafkaSerializationSchema

KafkaDynamicSink

for that.

I opened https://issues.apache.org/jira/browse/FLINK-22748 to discuss this further.

I hope this helps.

Regards,
Timo


On 20.05.21 12:43, Benoît Paris wrote:
Hi all!

I'm looking for a way to write to different Kafka topics based on some column value in SQL.

I think it's possible with Java, using KafkaSerializationSchema, and ProducerRecord(topic, ...), but I was wondering if I could somewhat access that feature in SQL.

I'm also trying to evaluate the amount of work required so that I implement it myself, subclassing the Kafka SQL connector just to add that feature.

Another alternative for me is to try to preprocess the SQL, detect Kafka Sinks, force a DataStream conversion, then replace the Kafka SQL sink with an equivalent DataStream that has the topic routing. (but this feels rather brittle and maintenance-hard to me, rather than having the option in the SQL sink configuration)

All comments/opinions/advice welcome!
Cheers
Ben

||



Reply via email to