Hi Ahmed,

If you want to dynamically produce events to different topics and you have
the logic to identify the target topics,  you will be able to achieve this
in the following way.


   - Suppose this is your event after the transformation logic(if any) :
   EVENT.
   - This is the target topic for this event, TOPIC_1.  ( I hope,  you have
   the logic available to identify the topic dynamically)



   - Create a new dataStream(custom DS) containing the folllowing
   attributes, topicName and event.


*class TransformedEvent  implements java.io.Serializable  {*
*String topicName;*
*Event event;*
*}*



   - Create the serialization schema for the topic as below,


*class CustomKafkaSchema implements
KafkaSerializationSchema<TransformedEvent>,*
*        KafkaContextAware<TransformedEvent> {*

*    @Override*
*    public ProducerRecord<byte[], byte[]> serialize(TransformedEvent
element, @Nullable Long timestamp) {*
*        byte[] serialized = new
customSerliazer().serialize(element.getEvent());*
*        return new ProducerRecord<>(getTargetTopic(element),*
*                null, null, null, serialized);*
*    }*

*    @Override*
*    public String getTargetTopic(TransformedEvent element) {*
*        return element.getTopicName();*
*    }*

}



   - Create the producer as below,


*FlinkKafkaProducer<TransformedEvent> producer= new FlinkKafkaProducer<>(*
*                    "DEFAULT",*
*                    new CustomKafkaSchema(), *
*                   producerConfiguration ,*
*                   FlinkKafkaProducer.Semantic.EXACTLY_ONCE/
AT_LEAST_ONCE);*



Thanks
Ejas khan



On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid <ahmedaha...@yahoo.com> wrote:

> Hello everyone,
>
> I have a use-case where I need to have a Flink application produce to a
> variable number of Kafka topics (specified through configuration),
> potentially in different clusters, without having to redeploy the app.
> Let's assume I maintain the set of destination clusters/topics in config
> files, and have code in my Flink app to detect and reload any changes in
> these config files at runtime.
>
> I have two questions:
>
>    1. Is that a sound/reasonable thing to do? Or is it going to be
>    riddled with issues?
>
>    2. To implement that, should I write a custom SinkFunction that
>    maintains a set of Kafka producers? Or a custom SinkFunction that delegates
>    the work to a collection of FlinkKafkaProducer instances? Is there a better
>    approach?
>
> Thanks in advance.
>
> Truly,
> Ahmed
>
>

Reply via email to