If you are repartitioning to 8 partitions, and your node happen to have at
least 4 cores each, its possible that all 8 partitions are assigned to only
2 nodes. Try increasing the number of partitions. Also make sure you have
executors (allocated by YARN) running on more than two nodes if you want to
use all 11 nodes in your yarn cluster.

If you are using Spark 1.x, then you dont need to set the ttl for running
Spark Streaming. In case you are using older version, why do you want to
reduce it? You could reduce it, but it does increase the risk of the
premature cleaning, if once in a while things get delayed by 20 seconds. I
dont see much harm in keeping the ttl at 60 seconds (a bit of extra garbage
shouldnt hurt performance).

TD


On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith <secs...@gmail.com> wrote:

> Hi,
>
> In my streaming app, I receive from kafka where I have tried setting the
> partitions when calling "createStream" or later, by calling repartition -
> in both cases, the number of nodes running the tasks seems to be stubbornly
> stuck at 2. Since I have 11 nodes in my cluster, I was hoping to use more
> nodes.
>
> I am starting the job as:
> nohup spark-submit --class logStreamNormalizer --master yarn
> log-stream-normalizer_2.10-1.0.jar --jars
> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
> --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
> --num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo $! >
> run-6.pid
>
> My main code is:
>  val sparkConf = new SparkConf().setAppName("SparkKafkaTest")
>  val ssc = new StreamingContext(sparkConf,Seconds(5))
>  val kInMsg =
> KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct"
> -> 16))
>
>  val propsMap = Map("metadata.broker.list" ->
> "node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092", "serializer.class" ->
> "kafka.serializer.StringEncoder", "producer.type" -> "async",
> "request.required.acks" -> "1")
>  val to_topic = """normStruct"""
>  val writer = new KafkaOutputService(to_topic, propsMap)
>
>
>  if (!configMap.keySet.isEmpty)
>  {
>   //kInMsg.repartition(8)
>   val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
>   outdata.foreachRDD((rdd,time) => { rdd.foreach(rec => {
> writer.output(rec) }) } )
>  }
>
>  ssc.start()
>  ssc.awaitTermination()
>
> In terms of total delay, with a 5 second batch, the delays usually stay
> under 5 seconds, but sometimes jump to ~10 seconds. As a performance tuning
> question, does this mean, I can reduce my cleaner ttl from 60 to say 25
> (still more than double of the peak delay)?
>
> Thanks
>
> Tim
>
>

Reply via email to