Hi,

In my streaming app, I receive from kafka where I have tried setting the
partitions when calling "createStream" or later, by calling repartition -
in both cases, the number of nodes running the tasks seems to be stubbornly
stuck at 2. Since I have 11 nodes in my cluster, I was hoping to use more
nodes.

I am starting the job as:
nohup spark-submit --class logStreamNormalizer --master yarn
log-stream-normalizer_2.10-1.0.jar --jars
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
--executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
--num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo $! >
run-6.pid

My main code is:
 val sparkConf = new SparkConf().setAppName("SparkKafkaTest")
 val ssc = new StreamingContext(sparkConf,Seconds(5))
 val kInMsg =
KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct"
-> 16))

 val propsMap = Map("metadata.broker.list" ->
"node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092", "serializer.class" ->
"kafka.serializer.StringEncoder", "producer.type" -> "async",
"request.required.acks" -> "1")
 val to_topic = """normStruct"""
 val writer = new KafkaOutputService(to_topic, propsMap)


 if (!configMap.keySet.isEmpty)
 {
  //kInMsg.repartition(8)
  val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
  outdata.foreachRDD((rdd,time) => { rdd.foreach(rec => {
writer.output(rec) }) } )
 }

 ssc.start()
 ssc.awaitTermination()

In terms of total delay, with a 5 second batch, the delays usually stay
under 5 seconds, but sometimes jump to ~10 seconds. As a performance tuning
question, does this mean, I can reduce my cleaner ttl from 60 to say 25
(still more than double of the peak delay)?

Thanks

Tim

Reply via email to