It works... Thanks On Thu, Mar 31, 2016 at 2:23 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
> Hi, > yes you can output the stages to several different Kafka Topics. If you > don't want to call addSink inside the run() method you somehow have to > return the handle to your stage3 DataStream, for example: > > private val env = StreamExecutionEnvironment.getExecutionEnvironment > private val src = env.addSource(Source.kafka(streams.abc.topic)) > > override def run(stream: DataStream[TypeX]) : = { > > val stage1 = stream > .map(doA) > .map(doB) > .map(doC) > > val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse: > TypeT)* > > val stage3 = stage2.filter(_.isTrue) > val stage4 = stage2.filter(! _.isTrue) > > (stage3, stage4.map(_.toString)) // return both stages > } > > val (stage3, stage4) = run(src) > stage3.addSink(Write_To_Kafka_Topic_Y) > stage4.addSink(Write_To_Kafka_Topic_X) > > > On Wed, 30 Mar 2016 at 20:19 Deepak Jha <dkjhan...@gmail.com> wrote: > > > Hi, > > I'm building a pipeline using Flink using Kafka as source and sink. As > part > > of the this pipeline I have multiple stages in my run command and I would > > like to publish some substages output into separate kafka topic. > > My question is can I write multiple stages of run to multiple kafka > topics > > ? > > > > private val env = StreamExecutionEnvironment.getExecutionEnvironment > > private val src = env.addSource(Source.kafka(streams.abc.topic)) > > > > override def run(stream: DataStream[TypeX]) : = { > > > > val stage1 = stream > > .map(doA) > > .map(doB) > > .map(doC) > > > > val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, > somethingElse: > > TypeT)* > > > > val stage3 = stage2.filter(_.isTrue) > > *stage3.addSink(Write_To_Kafka_Topic_Y) // Can I do it outside run > method > > ?* > > val stage4 = stage2.filter(! _.isTrue) > > > > stage4.map(_.toString) > > } > > > > run(src).addSink(Write_To_Kafka_Topic_X) > > > > > > Ideally I will not prefer to call addSink method inside run (as mentioned > > in bold lines above). > > -- > > Thanks, > > Deepak Jha > > > -- Thanks, Deepak Jha