Hi,

I'm just trying to create a spark streaming application that consumes more
than one topics sent by kafka. Then, I want to do different further
processing for data sent by each topic.

val kafkaStreams = {
>       val kafkaParameter = for (consumerGroup <- consumerGroups) yield {
>         Map(
>           "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>           "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>           "group.id" -> consumerGroup,
>           "zookeeper.connection.timeout.ms" ->
> ConsumerConfig.zookeeperConnectionTimeout,
>           "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>           "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
>         )
>       }
>       val streams = (0 to kafkaParameter.length - 1) map { p =>
>         KafkaUtils.createStream[String, Array[Byte], StringDecoder,
> DefaultDecoder](
>           ssc,
>           kafkaParameter(p),
>           Map(topicsArr(p) -> 1),
>           StorageLevel.MEMORY_ONLY_SER
>         ).map(_._2)
>       }
>       val unifiedStream = ssc.union(streams)
>       unifiedStream.repartition(1)
>     }
>     kafkaStreams.foreachRDD(rdd => {
>       rdd.foreachPartition(partitionOfRecords => {
>         partitionOfRecords.foreach ( x =>
>           println(x)
>         )
>       })
>     })


So far, I'm able to get the data from several topic. However, I'm still
unable to
differentiate the data sent from a topic with another.

Do anybody has an experience in doing this stuff?

Best,
Imre

Reply via email to