Thank you, Ejaskhan.
I think your suggestion would only work if all the topics were on the same 
Kafka cluster. In my use-case, the topics can be on different clusters, which 
is why I was thinking of rolling a custom sink that detects config changes and 
instantiates Kafka producers on demand as needed.


    On Tuesday, April 20, 2021, 01:11:08 PM PDT, Ejaskhan S 
<iamejask...@gmail.com> wrote:  
 
 Hi Ahmed,
If you want to dynamically produce events to different topics and you have the 
logic to identify the target topics,  you will be able to achieve this in the 
following way.

   
   - Suppose this is your event after the transformation logic(if any) :  EVENT.
   - This is the target topic for this event, TOPIC_1.  ( I hope,  you have the 
logic available to identify the topic dynamically)

   
   - Create a new dataStream(custom DS) containing the folllowing attributes, 
topicName and event.


class TransformedEvent  implements java.io.Serializable  {String 
topicName;Event event;}

   
   - Create the serialization schema for the topic as below,


class CustomKafkaSchema implements KafkaSerializationSchema<TransformedEvent>,  
      KafkaContextAware<TransformedEvent> {
    @Override    public ProducerRecord<byte[], byte[]> 
serialize(TransformedEvent element, @Nullable Long timestamp) {        byte[] 
serialized = new customSerliazer().serialize(element.getEvent());        return 
new ProducerRecord<>(getTargetTopic(element),                null, null, null, 
serialized);    }
    @Override    public String getTargetTopic(TransformedEvent element) {       
 return element.getTopicName();    }
}

   
   - Create the producer as below,


FlinkKafkaProducer<TransformedEvent> producer= new FlinkKafkaProducer<>(        
            "DEFAULT",                    new CustomKafkaSchema(),              
      producerConfiguration ,                   
FlinkKafkaProducer.Semantic.EXACTLY_ONCE/ AT_LEAST_ONCE);



ThanksEjas khan


On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid <ahmedaha...@yahoo.com> wrote:

Hello everyone,
I have a use-case where I need to have a Flink application produce to a 
variable number of Kafka topics (specified through configuration), potentially 
in different clusters, without having to redeploy the app. Let's assume I 
maintain the set of destination clusters/topics in config files, and have code 
in my Flink app to detect and reload any changes in these config files at 
runtime.
I have two questions:   
   - Is that a sound/reasonable thing to do? Or is it going to be riddled with 
issues?   
   

   - To implement that, should I write a custom SinkFunction that maintains a 
set of Kafka producers? Or a custom SinkFunction that delegates the work to a 
collection of FlinkKafkaProducer instances? Is there a better approach?
Thanks in advance.
Truly,Ahmed

  

Reply via email to