Actually, I have tried your suggestion but it seems not working. Let me try it once again.
Thanks for your help Best, Imre On Tue, Mar 15, 2016 at 1:52 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > One way would be to keep it this way: > > val stream1 = KafkaUtils.createStream(..) // for topic 1 > > val stream2 = KafkaUtils.createStream(..) // for topic 2 > > > And you will know which stream belongs to which topic. > > Another approach which you can put in your code itself would be to tag the > topic name along with the stream that you are creating. Like, create a > tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as > the stream. > > > Thanks > Best Regards > > On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi <imre.nagi2...@gmail.com> > wrote: > >> Hi, >> >> I'm just trying to create a spark streaming application that consumes >> more than one topics sent by kafka. Then, I want to do different further >> processing for data sent by each topic. >> >> val kafkaStreams = { >>> val kafkaParameter = for (consumerGroup <- consumerGroups) yield { >>> Map( >>> "metadata.broker.list" -> ConsumerConfig.metadataBrokerList, >>> "zookeeper.connect" -> ConsumerConfig.zookeeperConnect, >>> "group.id" -> consumerGroup, >>> "zookeeper.connection.timeout.ms" -> >>> ConsumerConfig.zookeeperConnectionTimeout, >>> "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl, >>> "auto.offset.reset" -> ConsumerConfig.autoOffsetReset >>> ) >>> } >>> val streams = (0 to kafkaParameter.length - 1) map { p => >>> KafkaUtils.createStream[String, Array[Byte], StringDecoder, >>> DefaultDecoder]( >>> ssc, >>> kafkaParameter(p), >>> Map(topicsArr(p) -> 1), >>> StorageLevel.MEMORY_ONLY_SER >>> ).map(_._2) >>> } >>> val unifiedStream = ssc.union(streams) >>> unifiedStream.repartition(1) >>> } >>> kafkaStreams.foreachRDD(rdd => { >>> rdd.foreachPartition(partitionOfRecords => { >>> partitionOfRecords.foreach ( x => >>> println(x) >>> ) >>> }) >>> }) >> >> >> So far, I'm able to get the data from several topic. However, I'm still >> unable to >> differentiate the data sent from a topic with another. >> >> Do anybody has an experience in doing this stuff? >> >> Best, >> Imre >> > >