Hi Cody, Can you give a bit example how to use mapPartitions with a switch on topic? I've tried, yet still didn't work.
On Tue, Mar 15, 2016 at 9:45 PM, Cody Koeninger <c...@koeninger.org> wrote: > The direct stream gives you access to the topic. The offset range for > each partition contains the topic. That way you can create a single > stream, and the first thing you do with it is mapPartitions with a > switch on topic. > > Of course, it may make more sense to separate topics into different > jobs, but if you want it all in one, that's the most straightforward > way to do it imho. > > On Tue, Mar 15, 2016 at 1:55 AM, saurabh guru <saurabh.g...@gmail.com> > wrote: > > I am doing the same thing this way: > > > > // Iterate over HashSet of topics > > Iterator<String> iterator = topicsSet.iterator(); > > JavaPairInputDStream<String, String> messages; > > JavaDStream<String> lines; > > String topic = ""; > > // get messages stream for each topic > > while (iterator.hasNext()) { > > topic = iterator.next(); > > // Create direct kafka stream with brokers and topic > > messages = KafkaUtils.createDirectStream(jssc, String.class, > > String.class, StringDecoder.class, StringDecoder.class, kafkaParams, > > new HashSet<String>(Arrays.asList(topic))); > > > > // get lines from messages.map > > lines = messages.map(new Function<Tuple2<String, String>, > > String>() { > > @Override > > public String call(Tuple2<String, String> tuple2) { > > return tuple2._2(); > > } > > }); > > > > > > switch (topic) { > > case IMPR_ACC: > > ImprLogProc.groupAndCount(lines, esImpIndexName, > IMPR_ACC, > > new ImprMarshal()); > > > > break; > > case EVENTS_ACC: > > EventLogProc.groupAndCount(lines, esEventIndexName, > > EVENTS_ACC, new EventMarshal()); > > break; > > > > default: > > logger.error("No matching Kafka topics Found"); > > break; > > } > > > > On Tue, Mar 15, 2016 at 12:22 PM, Akhil Das <ak...@sigmoidanalytics.com> > > wrote: > >> > >> One way would be to keep it this way: > >> > >> val stream1 = KafkaUtils.createStream(..) // for topic 1 > >> > >> val stream2 = KafkaUtils.createStream(..) // for topic 2 > >> > >> > >> And you will know which stream belongs to which topic. > >> > >> Another approach which you can put in your code itself would be to tag > the > >> topic name along with the stream that you are creating. Like, create a > >> tuple(topic, stream) and you will be able to access ._1 as topic and > ._2 as > >> the stream. > >> > >> > >> Thanks > >> Best Regards > >> > >> On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi <imre.nagi2...@gmail.com> > >> wrote: > >>> > >>> Hi, > >>> > >>> I'm just trying to create a spark streaming application that consumes > >>> more than one topics sent by kafka. Then, I want to do different > further > >>> processing for data sent by each topic. > >>> > >>>> val kafkaStreams = { > >>>> val kafkaParameter = for (consumerGroup <- consumerGroups) > yield { > >>>> Map( > >>>> "metadata.broker.list" -> ConsumerConfig.metadataBrokerList, > >>>> "zookeeper.connect" -> ConsumerConfig.zookeeperConnect, > >>>> "group.id" -> consumerGroup, > >>>> "zookeeper.connection.timeout.ms" -> > >>>> ConsumerConfig.zookeeperConnectionTimeout, > >>>> "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl, > >>>> "auto.offset.reset" -> ConsumerConfig.autoOffsetReset > >>>> ) > >>>> } > >>>> val streams = (0 to kafkaParameter.length - 1) map { p => > >>>> KafkaUtils.createStream[String, Array[Byte], StringDecoder, > >>>> DefaultDecoder]( > >>>> ssc, > >>>> kafkaParameter(p), > >>>> Map(topicsArr(p) -> 1), > >>>> StorageLevel.MEMORY_ONLY_SER > >>>> ).map(_._2) > >>>> } > >>>> val unifiedStream = ssc.union(streams) > >>>> unifiedStream.repartition(1) > >>>> } > >>>> kafkaStreams.foreachRDD(rdd => { > >>>> rdd.foreachPartition(partitionOfRecords => { > >>>> partitionOfRecords.foreach ( x => > >>>> println(x) > >>>> ) > >>>> }) > >>>> }) > >>> > >>> > >>> So far, I'm able to get the data from several topic. However, I'm still > >>> unable to > >>> differentiate the data sent from a topic with another. > >>> > >>> Do anybody has an experience in doing this stuff? > >>> > >>> Best, > >>> Imre > >> > >> > > > > > > > > -- > > Thanks, > > Saurabh > > > > :) >