I wrote a long post about how I arrived here but in a nutshell I don't see
evidence of re-partitioning and workload distribution across the cluster.
My new fangled way of starting the job is:

run=`date +"%m-%d-%YT%T"`; \
nohup spark-submit --class logStreamNormalizer \
--master yarn log-stream-normalizer_2.10-1.0.jar \
--jars
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 8G \
--executor-memory 30G \
--executor-cores 16 \
--num-executors 8 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 16 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
>logs/normRunLog-$run.log \
2>logs/normRunLogError-$run.log & \
echo $! > logs/run-$run.pid

Since the job spits out lots of logs, here is how I am trying to determine
if any tasks got assigned to non-local executors.
$ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log  | grep Starting
| grep -v NODE_LOCAL | grep -v PROCESS_LOCAL

Yields no lines.

If I look at resource pool usage in YARN, this app is assigned 252.5GB of
memory, 128 VCores and 9 containers. Am I missing something here?

Thanks,

Tim







On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith <secs...@gmail.com> wrote:

> I set partitions to 64:
>
> //
>  kInMsg.repartition(64)
>  val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
> //
>
> Still see all activity only on the two nodes that seem to be receiving
> from Kafka.
>
> On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith <secs...@gmail.com> wrote:
> > TD - Apologies, didn't realize I was replying to you instead of the list.
> >
> > What does "numPartitions" refer to when calling createStream? I read an
> > earlier thread that seemed to suggest that numPartitions translates to
> > partitions created on the Spark side?
> >
> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
> >
> > Actually, I re-tried with 64 numPartitions in createStream and that
> didn't
> > work. I will manually set "repartition" to 64/128 and see how that goes.
> >
> > Thanks.
> >
> >
> >
> >
> > On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das <
> tathagata.das1...@gmail.com>
> > wrote:
> >>
> >> Having 16 partitions in KafkaUtils.createStream does not translate to
> the
> >> RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
> >> best way to distribute the received data between all the nodes, as long
> as
> >> there are sufficient number of partitions (try setting it to 2x the
> number
> >> cores given to the application).
> >>
> >> Yeah, in 1.0.0, ttl should be unnecessary.
> >>
> >>
> >>
> >> On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith <secs...@gmail.com> wrote:
> >>>
> >>> On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das
> >>> <tathagata.das1...@gmail.com> wrote:
> >>>>
> >>>> If you are repartitioning to 8 partitions, and your node happen to
> have
> >>>> at least 4 cores each, its possible that all 8 partitions are
> assigned to
> >>>> only 2 nodes. Try increasing the number of partitions. Also make sure
> you
> >>>> have executors (allocated by YARN) running on more than two nodes if
> you
> >>>> want to use all 11 nodes in your yarn cluster.
> >>>
> >>>
> >>> If you look at the code, I commented out the manual re-partitioning to
> 8.
> >>> Instead, I am created 16 partitions when I call createStream. But I
> will
> >>> increase the partitions to, say, 64 and see if I get better
> parallelism.
> >>>
> >>>>
> >>>>
> >>>> If you are using Spark 1.x, then you dont need to set the ttl for
> >>>> running Spark Streaming. In case you are using older version, why do
> you
> >>>> want to reduce it? You could reduce it, but it does increase the risk
> of the
> >>>> premature cleaning, if once in a while things get delayed by 20
> seconds. I
> >>>> dont see much harm in keeping the ttl at 60 seconds (a bit of extra
> garbage
> >>>> shouldnt hurt performance).
> >>>>
> >>>
> >>> I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are
> right,
> >>> unless I have memory issues, more aggressive pruning won't help.
> >>>
> >>> Thanks,
> >>>
> >>> Tim
> >>>
> >>>
> >>>
> >>>>
> >>>> TD
> >>>>
> >>>>
> >>>> On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith <secs...@gmail.com> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> In my streaming app, I receive from kafka where I have tried setting
> >>>>> the partitions when calling "createStream" or later, by calling
> repartition
> >>>>> - in both cases, the number of nodes running the tasks seems to be
> >>>>> stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was
> hoping to
> >>>>> use more nodes.
> >>>>>
> >>>>> I am starting the job as:
> >>>>> nohup spark-submit --class logStreamNormalizer --master yarn
> >>>>> log-stream-normalizer_2.10-1.0.jar --jars
> >>>>>
> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
> >>>>> --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
> >>>>> --num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo
> $! >
> >>>>> run-6.pid
> >>>>>
> >>>>> My main code is:
> >>>>>  val sparkConf = new SparkConf().setAppName("SparkKafkaTest")
> >>>>>  val ssc = new StreamingContext(sparkConf,Seconds(5))
> >>>>>  val kInMsg =
> >>>>>
> KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct"
> >>>>> -> 16))
> >>>>>
> >>>>>  val propsMap = Map("metadata.broker.list" ->
> >>>>> "node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092",
> "serializer.class" ->
> >>>>> "kafka.serializer.StringEncoder", "producer.type" -> "async",
> >>>>> "request.required.acks" -> "1")
> >>>>>  val to_topic = """normStruct"""
> >>>>>  val writer = new KafkaOutputService(to_topic, propsMap)
> >>>>>
> >>>>>
> >>>>>  if (!configMap.keySet.isEmpty)
> >>>>>  {
> >>>>>   //kInMsg.repartition(8)
> >>>>>   val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
> >>>>>   outdata.foreachRDD((rdd,time) => { rdd.foreach(rec => {
> >>>>> writer.output(rec) }) } )
> >>>>>  }
> >>>>>
> >>>>>  ssc.start()
> >>>>>  ssc.awaitTermination()
> >>>>>
> >>>>> In terms of total delay, with a 5 second batch, the delays usually
> stay
> >>>>> under 5 seconds, but sometimes jump to ~10 seconds. As a performance
> tuning
> >>>>> question, does this mean, I can reduce my cleaner ttl from 60 to say
> 25
> >>>>> (still more than double of the peak delay)?
> >>>>>
> >>>>> Thanks
> >>>>>
> >>>>> Tim
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to