Hi Dhurandar, if you use KafkaSerializationSchema [1], you can create a producer record, where you explicitly set the output topic. The topic can be arbitrarily calculated.
You pass it while constructing the sink: stream.addSink(new FlinkKafkaProducer<T>( topic, serSchema, // <-- props, producerSemantic)) If you use a specific format that Flink already provides, I'd recommend to use the delegate pattern to wrap it and add your custom topic logic. [1] https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html On Wed, Apr 29, 2020 at 11:19 PM dhurandar S <dhurandarg...@gmail.com> wrote: > Hi , > > We have a use case where we have to demultiplex the incoming stream to > multiple output streams. > > We read from 1 Kafka topic and as an output we generate multiple Kafka > topics. The logic of generating each new Kafka topic is different and not > known beforehand. Users of the system keep adding new logic and henceforth > the system needs to generate the data in the new topic with logic applied > to the incoming stream. > > Input to the system would be logic code or SQL statement and destination > topic or S3 location. The system should be able to read this configuration > and emit those, hopefully at runtime. > > Any guidance if this is possible in flink . and some pointers how this can > be achieved. > > regards, > Dhuranda > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng