Hi, yes you can output the stages to several different Kafka Topics. If you don't want to call addSink inside the run() method you somehow have to return the handle to your stage3 DataStream, for example:
private val env = StreamExecutionEnvironment.getExecutionEnvironment private val src = env.addSource(Source.kafka(streams.abc.topic)) override def run(stream: DataStream[TypeX]) : = { val stage1 = stream .map(doA) .map(doB) .map(doC) val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse: TypeT)* val stage3 = stage2.filter(_.isTrue) val stage4 = stage2.filter(! _.isTrue) (stage3, stage4.map(_.toString)) // return both stages } val (stage3, stage4) = run(src) stage3.addSink(Write_To_Kafka_Topic_Y) stage4.addSink(Write_To_Kafka_Topic_X) On Wed, 30 Mar 2016 at 20:19 Deepak Jha <dkjhan...@gmail.com> wrote: > Hi, > I'm building a pipeline using Flink using Kafka as source and sink. As part > of the this pipeline I have multiple stages in my run command and I would > like to publish some substages output into separate kafka topic. > My question is can I write multiple stages of run to multiple kafka topics > ? > > private val env = StreamExecutionEnvironment.getExecutionEnvironment > private val src = env.addSource(Source.kafka(streams.abc.topic)) > > override def run(stream: DataStream[TypeX]) : = { > > val stage1 = stream > .map(doA) > .map(doB) > .map(doC) > > val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse: > TypeT)* > > val stage3 = stage2.filter(_.isTrue) > *stage3.addSink(Write_To_Kafka_Topic_Y) // Can I do it outside run method > ?* > val stage4 = stage2.filter(! _.isTrue) > > stage4.map(_.toString) > } > > run(src).addSink(Write_To_Kafka_Topic_X) > > > Ideally I will not prefer to call addSink method inside run (as mentioned > in bold lines above). > -- > Thanks, > Deepak Jha >