Hi,
I'm building a pipeline using Flink using Kafka as source and sink. As part
of the this pipeline I have multiple stages in my run command and I would
like to publish some substages output into separate kafka topic.
My question is can I write multiple stages of run to multiple kafka topics ?

private val env = StreamExecutionEnvironment.getExecutionEnvironment
private val src = env.addSource(Source.kafka(streams.abc.topic))

override def run(stream: DataStream[TypeX]) : = {

  val stage1 = stream
                       .map(doA)
                       .map(doB)
                       .map(doC)

 val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean, somethingElse:
TypeT)*

val stage3 = stage2.filter(_.isTrue)
*stage3.addSink(Write_To_Kafka_Topic_Y)  // Can I do it outside run method
?*
val stage4 = stage2.filter(! _.isTrue)

stage4.map(_.toString)
}

run(src).addSink(Write_To_Kafka_Topic_X)


Ideally I will not prefer to call addSink method inside run (as mentioned
in bold lines above).
-- 
Thanks,
Deepak Jha

Reply via email to