Hi, I'm building a pipeline using Flink using Kafka as source and sink. As part of the this pipeline I have multiple stages in my run command and I would like to publish some substages output into separate kafka topic. My question is can I write multiple stages of run to multiple kafka topics ?
private val env = StreamExecutionEnvironment.getExecutionEnvironment private val src = env.addSource(Source.kafka(streams.abc.topic)) override def run(stream: DataStream[TypeX]) : = { val stage1 = stream .map(doA) .map(doB) .map(doC) val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse: TypeT)* val stage3 = stage2.filter(_.isTrue) *stage3.addSink(Write_To_Kafka_Topic_Y) // Can I do it outside run method ?* val stage4 = stage2.filter(! _.isTrue) stage4.map(_.toString) } run(src).addSink(Write_To_Kafka_Topic_X) Ideally I will not prefer to call addSink method inside run (as mentioned in bold lines above). -- Thanks, Deepak Jha