Hi Ahmed,

If you have the logic to identify the destination cluster along with the
target topic, you will be able to achieve this with the above solution.

1. Create one kafka producer for each cluster. If 10 clusters are there,
create 10 producers.

2. Add a new attribute called 'clusterId' or something more meaningful for
identifying the cluster inside TransformedEvent.

3. Filter the Datastream<TransformedEvent> based on the clusterId. And add
the corresponding cluster related producer into the filtered stream.


Thanks
Ejaskhan

On Wed, Apr 21, 2021, 1:49 AM Ahmed A.Hamid <ahmedaha...@yahoo.com> wrote:

> Thank you, Ejaskhan.
>
> I think your suggestion would only work if all the topics were on the same
> Kafka cluster. In my use-case, the topics can be on different clusters,
> which is why I was thinking of rolling a custom sink that detects config
> changes and instantiates Kafka producers on demand as needed.
>
>
> On Tuesday, April 20, 2021, 01:11:08 PM PDT, Ejaskhan S <
> iamejask...@gmail.com> wrote:
>
>
> Hi Ahmed,
>
> If you want to dynamically produce events to different topics and you have
> the logic to identify the target topics,  you will be able to achieve this
> in the following way.
>
>
>    - Suppose this is your event after the transformation logic(if any) :
>    EVENT.
>    - This is the target topic for this event, TOPIC_1.  ( I hope,  you
>    have the logic available to identify the topic dynamically)
>
>
>
>    - Create a new dataStream(custom DS) containing the folllowing
>    attributes, topicName and event.
>
>
> *class TransformedEvent  implements java.io.Serializable  {*
> *String topicName;*
> *Event event;*
> *}*
>
>
>
>    - Create the serialization schema for the topic as below,
>
>
> *class CustomKafkaSchema implements
> KafkaSerializationSchema<TransformedEvent>,*
> *        KafkaContextAware<TransformedEvent> {*
>
> *    @Override*
> *    public ProducerRecord<byte[], byte[]> serialize(TransformedEvent
> element, @Nullable Long timestamp) {*
> *        byte[] serialized = new
> customSerliazer().serialize(element.getEvent());*
> *        return new ProducerRecord<>(getTargetTopic(element),*
> *                null, null, null, serialized);*
> *    }*
>
> *    @Override*
> *    public String getTargetTopic(TransformedEvent element) {*
> *        return element.getTopicName();*
> *    }*
>
> }
>
>
>
>    - Create the producer as below,
>
>
> *FlinkKafkaProducer<TransformedEvent> producer= new FlinkKafkaProducer<>(*
> *                    "DEFAULT",*
> *                    new CustomKafkaSchema(), *
> *                   producerConfiguration ,*
> *                   FlinkKafkaProducer.Semantic.EXACTLY_ONCE/
> AT_LEAST_ONCE);*
>
>
>
> Thanks
> Ejas khan
>
>
>
> On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid <ahmedaha...@yahoo.com>
> wrote:
>
> Hello everyone,
>
> I have a use-case where I need to have a Flink application produce to a
> variable number of Kafka topics (specified through configuration),
> potentially in different clusters, without having to redeploy the app.
> Let's assume I maintain the set of destination clusters/topics in config
> files, and have code in my Flink app to detect and reload any changes in
> these config files at runtime.
>
> I have two questions:
>
>    1. Is that a sound/reasonable thing to do? Or is it going to be
>    riddled with issues?
>
>    2. To implement that, should I write a custom SinkFunction that
>    maintains a set of Kafka producers? Or a custom SinkFunction that delegates
>    the work to a collection of FlinkKafkaProducer instances? Is there a better
>    approach?
>
> Thanks in advance.
>
> Truly,
> Ahmed
>
>

Reply via email to