Hi, In my streaming app, I receive from kafka where I have tried setting the partitions when calling "createStream" or later, by calling repartition - in both cases, the number of nodes running the tasks seems to be stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping to use more nodes.
I am starting the job as: nohup spark-submit --class logStreamNormalizer --master yarn log-stream-normalizer_2.10-1.0.jar --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8 --num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo $! > run-6.pid My main code is: val sparkConf = new SparkConf().setAppName("SparkKafkaTest") val ssc = new StreamingContext(sparkConf,Seconds(5)) val kInMsg = KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct" -> 16)) val propsMap = Map("metadata.broker.list" -> "node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092", "serializer.class" -> "kafka.serializer.StringEncoder", "producer.type" -> "async", "request.required.acks" -> "1") val to_topic = """normStruct""" val writer = new KafkaOutputService(to_topic, propsMap) if (!configMap.keySet.isEmpty) { //kInMsg.repartition(8) val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) outdata.foreachRDD((rdd,time) => { rdd.foreach(rec => { writer.output(rec) }) } ) } ssc.start() ssc.awaitTermination() In terms of total delay, with a 5 second batch, the delays usually stay under 5 seconds, but sometimes jump to ~10 seconds. As a performance tuning question, does this mean, I can reduce my cleaner ttl from 60 to say 25 (still more than double of the peak delay)? Thanks Tim