Hi Cody,

Can you give a bit example how to use mapPartitions with a switch on topic?
I've tried, yet still didn't work.

On Tue, Mar 15, 2016 at 9:45 PM, Cody Koeninger <c...@koeninger.org> wrote:

> The direct stream gives you access to the topic.  The offset range for
> each partition contains the topic.  That way you can create a single
> stream, and the first thing you do with it is mapPartitions with a
> switch on topic.
>
> Of course, it may make more sense to separate topics into different
> jobs, but if you want it all in one, that's the most straightforward
> way to do it imho.
>
> On Tue, Mar 15, 2016 at 1:55 AM, saurabh guru <saurabh.g...@gmail.com>
> wrote:
> > I am doing the same thing this way:
> >
> > // Iterate over HashSet of topics
> >         Iterator<String> iterator = topicsSet.iterator();
> >         JavaPairInputDStream<String, String> messages;
> >         JavaDStream<String> lines;
> >         String topic = "";
> >         // get messages stream for each topic
> >         while (iterator.hasNext()) {
> >             topic = iterator.next();
> >             // Create direct kafka stream with brokers and topic
> >             messages = KafkaUtils.createDirectStream(jssc, String.class,
> > String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
> >                     new HashSet<String>(Arrays.asList(topic)));
> >
> >             // get lines from messages.map
> >             lines = messages.map(new Function<Tuple2<String, String>,
> > String>() {
> >                 @Override
> >                 public String call(Tuple2<String, String> tuple2) {
> >                     return tuple2._2();
> >                 }
> >             });
> >
> >
> >             switch (topic) {
> >             case IMPR_ACC:
> >                 ImprLogProc.groupAndCount(lines, esImpIndexName,
> IMPR_ACC,
> > new ImprMarshal());
> >
> >                 break;
> >             case EVENTS_ACC:
> >                 EventLogProc.groupAndCount(lines, esEventIndexName,
> > EVENTS_ACC, new EventMarshal());
> >                 break;
> >
> >             default:
> >                 logger.error("No matching Kafka topics Found");
> >                 break;
> >             }
> >
> > On Tue, Mar 15, 2016 at 12:22 PM, Akhil Das <ak...@sigmoidanalytics.com>
> > wrote:
> >>
> >> One way would be to keep it this way:
> >>
> >> val stream1 = KafkaUtils.createStream(..) // for topic 1
> >>
> >> val stream2 = KafkaUtils.createStream(..) // for topic 2
> >>
> >>
> >> And you will know which stream belongs to which topic.
> >>
> >> Another approach which you can put in your code itself would be to tag
> the
> >> topic name along with the stream that you are creating. Like, create a
> >> tuple(topic, stream) and you will be able to access ._1 as topic and
> ._2 as
> >> the stream.
> >>
> >>
> >> Thanks
> >> Best Regards
> >>
> >> On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi <imre.nagi2...@gmail.com>
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm just trying to create a spark streaming application that consumes
> >>> more than one topics sent by kafka. Then, I want to do different
> further
> >>> processing for data sent by each topic.
> >>>
> >>>> val kafkaStreams = {
> >>>>       val kafkaParameter = for (consumerGroup <- consumerGroups)
> yield {
> >>>>         Map(
> >>>>           "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
> >>>>           "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
> >>>>           "group.id" -> consumerGroup,
> >>>>           "zookeeper.connection.timeout.ms" ->
> >>>> ConsumerConfig.zookeeperConnectionTimeout,
> >>>>           "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
> >>>>           "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
> >>>>         )
> >>>>       }
> >>>>       val streams = (0 to kafkaParameter.length - 1) map { p =>
> >>>>         KafkaUtils.createStream[String, Array[Byte], StringDecoder,
> >>>> DefaultDecoder](
> >>>>           ssc,
> >>>>           kafkaParameter(p),
> >>>>           Map(topicsArr(p) -> 1),
> >>>>           StorageLevel.MEMORY_ONLY_SER
> >>>>         ).map(_._2)
> >>>>       }
> >>>>       val unifiedStream = ssc.union(streams)
> >>>>       unifiedStream.repartition(1)
> >>>>     }
> >>>>     kafkaStreams.foreachRDD(rdd => {
> >>>>       rdd.foreachPartition(partitionOfRecords => {
> >>>>         partitionOfRecords.foreach ( x =>
> >>>>           println(x)
> >>>>         )
> >>>>       })
> >>>>     })
> >>>
> >>>
> >>> So far, I'm able to get the data from several topic. However, I'm still
> >>> unable to
> >>> differentiate the data sent from a topic with another.
> >>>
> >>> Do anybody has an experience in doing this stuff?
> >>>
> >>> Best,
> >>> Imre
> >>
> >>
> >
> >
> >
> > --
> > Thanks,
> > Saurabh
> >
> > :)
>

Reply via email to