> "How do I keep a balance of executors which receive data from Kafka and
which process data"

I think you're misunderstanding how the direct stream works.  The executor
which receives data is also the executor which processes data, there aren't
separate receivers.  If it's a single stage worth of work (e.g. straight
map / filter), the processing of a given partition is going to be done by
the executor that read it from kafka.  If you do something involving a
shuffle (e.g. reduceByKey), other executors will do additional processing.
The question of which executor works on which tasks is up to the scheduler
(and getPreferredLocations, which only matters if you're running spark on
the same nodes as kafka)

On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> Hello all,
>
> I see that there are as of today 3 ways one can read from Kafka in spark
> streaming:
> 1. KafkaUtils.createStream() (here
> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
> 2. KafkaUtils.createDirectStream() (here
> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
> 3. Kafka-spark-consumer (here
> <https://github.com/dibbhatt/kafka-spark-consumer>)
>
> My spark streaming application has to read from 1 kafka topic with around
> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
> filtering I need to maintain top 10000 URL counts. I don't really care
> about exactly once semantics as I am interested in rough estimate.
>
> Code:
>
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
> sparkConf.setAppName("KafkaReader")
> val ssc = StreamingContext.getOrCreate(kCheckPointDir, createStreamingContext)
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
> val kafkaParams = Map[String, String](
>   "metadata.broker.list" -> "kafka.server.ip:9092",
>   "group.id" -> consumer_group
> )
>
> val lineStreams = (1 to N).map{ _ =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>     ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
> }
>
> ssc.union(
>   lineStreams.map(stream => {
>   stream.map(ParseStringToLogRecord)
>     .filter(record => isGoodRecord(record))
>     .map(record => record.url)
>   })
> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute moving 
> window, 28 will probably help in parallelism
>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>   .mapPartitions(iter => {
>     iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
> 1000).iterator
>   }, true)
>   .foreachRDD((latestRDD, rddTime) => {
>       printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
> record._1)).sortByKey(false).take(1000))
>   })
>
> ssc.start()
> ssc.awaitTermination()
>
> Questions:
>
> a) I used #2 but I found that I couldn't control how many executors will
> be actually fetching from Kafka. How do I keep a balance of executors which
> receive data from Kafka and which process data? Do they keep changing for
> every batch?
>
> b) Now I am trying to use #1 creating multiple DStreams, filtering them
> and then doing a union. I don't understand why would the number of events
> processed per 120 seconds batch will change drastically. PFA the events/sec
> graph while running with 1 receiver. How to debug this?
>
> c) What will be the most suitable method to integrate with Kafka from
> above 3? Any recommendations for getting maximum performance, running the
> streaming application reliably in production environment?
>
> --
> Thanks
> Jatin Kumar
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

Reply via email to