Hi, I'm just trying to create a spark streaming application that consumes more than one topics sent by kafka. Then, I want to do different further processing for data sent by each topic.
val kafkaStreams = { > val kafkaParameter = for (consumerGroup <- consumerGroups) yield { > Map( > "metadata.broker.list" -> ConsumerConfig.metadataBrokerList, > "zookeeper.connect" -> ConsumerConfig.zookeeperConnect, > "group.id" -> consumerGroup, > "zookeeper.connection.timeout.ms" -> > ConsumerConfig.zookeeperConnectionTimeout, > "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl, > "auto.offset.reset" -> ConsumerConfig.autoOffsetReset > ) > } > val streams = (0 to kafkaParameter.length - 1) map { p => > KafkaUtils.createStream[String, Array[Byte], StringDecoder, > DefaultDecoder]( > ssc, > kafkaParameter(p), > Map(topicsArr(p) -> 1), > StorageLevel.MEMORY_ONLY_SER > ).map(_._2) > } > val unifiedStream = ssc.union(streams) > unifiedStream.repartition(1) > } > kafkaStreams.foreachRDD(rdd => { > rdd.foreachPartition(partitionOfRecords => { > partitionOfRecords.foreach ( x => > println(x) > ) > }) > }) So far, I'm able to get the data from several topic. However, I'm still unable to differentiate the data sent from a topic with another. Do anybody has an experience in doing this stuff? Best, Imre