Hi Ahmed, If you want to dynamically produce events to different topics and you have the logic to identify the target topics, you will be able to achieve this in the following way.
- Suppose this is your event after the transformation logic(if any) : EVENT. - This is the target topic for this event, TOPIC_1. ( I hope, you have the logic available to identify the topic dynamically) - Create a new dataStream(custom DS) containing the folllowing attributes, topicName and event. *class TransformedEvent implements java.io.Serializable {* *String topicName;* *Event event;* *}* - Create the serialization schema for the topic as below, *class CustomKafkaSchema implements KafkaSerializationSchema<TransformedEvent>,* * KafkaContextAware<TransformedEvent> {* * @Override* * public ProducerRecord<byte[], byte[]> serialize(TransformedEvent element, @Nullable Long timestamp) {* * byte[] serialized = new customSerliazer().serialize(element.getEvent());* * return new ProducerRecord<>(getTargetTopic(element),* * null, null, null, serialized);* * }* * @Override* * public String getTargetTopic(TransformedEvent element) {* * return element.getTopicName();* * }* } - Create the producer as below, *FlinkKafkaProducer<TransformedEvent> producer= new FlinkKafkaProducer<>(* * "DEFAULT",* * new CustomKafkaSchema(), * * producerConfiguration ,* * FlinkKafkaProducer.Semantic.EXACTLY_ONCE/ AT_LEAST_ONCE);* Thanks Ejas khan On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid <ahmedaha...@yahoo.com> wrote: > Hello everyone, > > I have a use-case where I need to have a Flink application produce to a > variable number of Kafka topics (specified through configuration), > potentially in different clusters, without having to redeploy the app. > Let's assume I maintain the set of destination clusters/topics in config > files, and have code in my Flink app to detect and reload any changes in > these config files at runtime. > > I have two questions: > > 1. Is that a sound/reasonable thing to do? Or is it going to be > riddled with issues? > > 2. To implement that, should I write a custom SinkFunction that > maintains a set of Kafka producers? Or a custom SinkFunction that delegates > the work to a collection of FlinkKafkaProducer instances? Is there a better > approach? > > Thanks in advance. > > Truly, > Ahmed > >