Re: Streaming app consume multiple kafka topics

2016-03-15 Thread Imre Nagi
Hi Cody, Can you give a bit example how to use mapPartitions with a switch on topic? I've tried, yet still didn't work. On Tue, Mar 15, 2016 at 9:45 PM, Cody Koeninger wrote: > The direct stream gives you access to the topic. The offset range for > each partition contains the topic. That way

Re: Streaming app consume multiple kafka topics

2016-03-15 Thread Cody Koeninger
The direct stream gives you access to the topic. The offset range for each partition contains the topic. That way you can create a single stream, and the first thing you do with it is mapPartitions with a switch on topic. Of course, it may make more sense to separate topics into different jobs,

Re: Streaming app consume multiple kafka topics

2016-03-14 Thread Imre Nagi
Actually, I have tried your suggestion but it seems not working. Let me try it once again. Thanks for your help Best, Imre On Tue, Mar 15, 2016 at 1:52 PM, Akhil Das wrote: > One way would be to keep it this way: > > val stream1 = KafkaUtils.createStream(..) // for topic 1 > > val stream2 = Kaf

Re: Streaming app consume multiple kafka topics

2016-03-14 Thread saurabh guru
I am doing the same thing this way: // Iterate over HashSet of topics Iterator iterator = topicsSet.iterator(); JavaPairInputDStream messages; JavaDStream lines; String topic = ""; // get messages stream for each topic while (iterator.hasNext()) {

Re: Streaming app consume multiple kafka topics

2016-03-14 Thread Akhil Das
One way would be to keep it this way: val stream1 = KafkaUtils.createStream(..) // for topic 1 val stream2 = KafkaUtils.createStream(..) // for topic 2 And you will know which stream belongs to which topic. Another approach which you can put in your code itself would be to tag the topic name a

Streaming app consume multiple kafka topics

2016-03-14 Thread Imre Nagi
Hi, I'm just trying to create a spark streaming application that consumes more than one topics sent by kafka. Then, I want to do different further processing for data sent by each topic. val kafkaStreams = { > val kafkaParameter = for (consumerGroup <- consumerGroups) yield { > Map(