I wrote a long post about how I arrived here but in a nutshell I don't see evidence of re-partitioning and workload distribution across the cluster. My new fangled way of starting the job is:
run=`date +"%m-%d-%YT%T"`; \ nohup spark-submit --class logStreamNormalizer \ --master yarn log-stream-normalizer_2.10-1.0.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 8G \ --executor-memory 30G \ --executor-cores 16 \ --num-executors 8 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 16 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ >logs/normRunLog-$run.log \ 2>logs/normRunLogError-$run.log & \ echo $! > logs/run-$run.pid Since the job spits out lots of logs, here is how I am trying to determine if any tasks got assigned to non-local executors. $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log | grep Starting | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL Yields no lines. If I look at resource pool usage in YARN, this app is assigned 252.5GB of memory, 128 VCores and 9 containers. Am I missing something here? Thanks, Tim On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith <secs...@gmail.com> wrote: > I set partitions to 64: > > // > kInMsg.repartition(64) > val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) > // > > Still see all activity only on the two nodes that seem to be receiving > from Kafka. > > On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith <secs...@gmail.com> wrote: > > TD - Apologies, didn't realize I was replying to you instead of the list. > > > > What does "numPartitions" refer to when calling createStream? I read an > > earlier thread that seemed to suggest that numPartitions translates to > > partitions created on the Spark side? > > > http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E > > > > Actually, I re-tried with 64 numPartitions in createStream and that > didn't > > work. I will manually set "repartition" to 64/128 and see how that goes. > > > > Thanks. > > > > > > > > > > On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das < > tathagata.das1...@gmail.com> > > wrote: > >> > >> Having 16 partitions in KafkaUtils.createStream does not translate to > the > >> RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the > >> best way to distribute the received data between all the nodes, as long > as > >> there are sufficient number of partitions (try setting it to 2x the > number > >> cores given to the application). > >> > >> Yeah, in 1.0.0, ttl should be unnecessary. > >> > >> > >> > >> On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith <secs...@gmail.com> wrote: > >>> > >>> On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das > >>> <tathagata.das1...@gmail.com> wrote: > >>>> > >>>> If you are repartitioning to 8 partitions, and your node happen to > have > >>>> at least 4 cores each, its possible that all 8 partitions are > assigned to > >>>> only 2 nodes. Try increasing the number of partitions. Also make sure > you > >>>> have executors (allocated by YARN) running on more than two nodes if > you > >>>> want to use all 11 nodes in your yarn cluster. > >>> > >>> > >>> If you look at the code, I commented out the manual re-partitioning to > 8. > >>> Instead, I am created 16 partitions when I call createStream. But I > will > >>> increase the partitions to, say, 64 and see if I get better > parallelism. > >>> > >>>> > >>>> > >>>> If you are using Spark 1.x, then you dont need to set the ttl for > >>>> running Spark Streaming. In case you are using older version, why do > you > >>>> want to reduce it? You could reduce it, but it does increase the risk > of the > >>>> premature cleaning, if once in a while things get delayed by 20 > seconds. I > >>>> dont see much harm in keeping the ttl at 60 seconds (a bit of extra > garbage > >>>> shouldnt hurt performance). > >>>> > >>> > >>> I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are > right, > >>> unless I have memory issues, more aggressive pruning won't help. > >>> > >>> Thanks, > >>> > >>> Tim > >>> > >>> > >>> > >>>> > >>>> TD > >>>> > >>>> > >>>> On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith <secs...@gmail.com> wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> In my streaming app, I receive from kafka where I have tried setting > >>>>> the partitions when calling "createStream" or later, by calling > repartition > >>>>> - in both cases, the number of nodes running the tasks seems to be > >>>>> stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was > hoping to > >>>>> use more nodes. > >>>>> > >>>>> I am starting the job as: > >>>>> nohup spark-submit --class logStreamNormalizer --master yarn > >>>>> log-stream-normalizer_2.10-1.0.jar --jars > >>>>> > spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar > >>>>> --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8 > >>>>> --num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo > $! > > >>>>> run-6.pid > >>>>> > >>>>> My main code is: > >>>>> val sparkConf = new SparkConf().setAppName("SparkKafkaTest") > >>>>> val ssc = new StreamingContext(sparkConf,Seconds(5)) > >>>>> val kInMsg = > >>>>> > KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct" > >>>>> -> 16)) > >>>>> > >>>>> val propsMap = Map("metadata.broker.list" -> > >>>>> "node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092", > "serializer.class" -> > >>>>> "kafka.serializer.StringEncoder", "producer.type" -> "async", > >>>>> "request.required.acks" -> "1") > >>>>> val to_topic = """normStruct""" > >>>>> val writer = new KafkaOutputService(to_topic, propsMap) > >>>>> > >>>>> > >>>>> if (!configMap.keySet.isEmpty) > >>>>> { > >>>>> //kInMsg.repartition(8) > >>>>> val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) > >>>>> outdata.foreachRDD((rdd,time) => { rdd.foreach(rec => { > >>>>> writer.output(rec) }) } ) > >>>>> } > >>>>> > >>>>> ssc.start() > >>>>> ssc.awaitTermination() > >>>>> > >>>>> In terms of total delay, with a 5 second batch, the delays usually > stay > >>>>> under 5 seconds, but sometimes jump to ~10 seconds. As a performance > tuning > >>>>> question, does this mean, I can reduce my cleaner ttl from 60 to say > 25 > >>>>> (still more than double of the peak delay)? > >>>>> > >>>>> Thanks > >>>>> > >>>>> Tim > >>>>> > >>>> > >>> > >> > > >