Thank you, Ejaskhan. I think your suggestion would only work if all the topics were on the same Kafka cluster. In my use-case, the topics can be on different clusters, which is why I was thinking of rolling a custom sink that detects config changes and instantiates Kafka producers on demand as needed.
On Tuesday, April 20, 2021, 01:11:08 PM PDT, Ejaskhan S <iamejask...@gmail.com> wrote: Hi Ahmed, If you want to dynamically produce events to different topics and you have the logic to identify the target topics, you will be able to achieve this in the following way. - Suppose this is your event after the transformation logic(if any) : EVENT. - This is the target topic for this event, TOPIC_1. ( I hope, you have the logic available to identify the topic dynamically) - Create a new dataStream(custom DS) containing the folllowing attributes, topicName and event. class TransformedEvent implements java.io.Serializable {String topicName;Event event;} - Create the serialization schema for the topic as below, class CustomKafkaSchema implements KafkaSerializationSchema<TransformedEvent>, KafkaContextAware<TransformedEvent> { @Override public ProducerRecord<byte[], byte[]> serialize(TransformedEvent element, @Nullable Long timestamp) { byte[] serialized = new customSerliazer().serialize(element.getEvent()); return new ProducerRecord<>(getTargetTopic(element), null, null, null, serialized); } @Override public String getTargetTopic(TransformedEvent element) { return element.getTopicName(); } } - Create the producer as below, FlinkKafkaProducer<TransformedEvent> producer= new FlinkKafkaProducer<>( "DEFAULT", new CustomKafkaSchema(), producerConfiguration , FlinkKafkaProducer.Semantic.EXACTLY_ONCE/ AT_LEAST_ONCE); ThanksEjas khan On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid <ahmedaha...@yahoo.com> wrote: Hello everyone, I have a use-case where I need to have a Flink application produce to a variable number of Kafka topics (specified through configuration), potentially in different clusters, without having to redeploy the app. Let's assume I maintain the set of destination clusters/topics in config files, and have code in my Flink app to detect and reload any changes in these config files at runtime. I have two questions: - Is that a sound/reasonable thing to do? Or is it going to be riddled with issues? - To implement that, should I write a custom SinkFunction that maintains a set of Kafka producers? Or a custom SinkFunction that delegates the work to a collection of FlinkKafkaProducer instances? Is there a better approach? Thanks in advance. Truly,Ahmed