I set partitions to 64:

//
 kInMsg.repartition(64)
 val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
//

Still see all activity only on the two nodes that seem to be receiving
from Kafka.

On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith <secs...@gmail.com> wrote:
> TD - Apologies, didn't realize I was replying to you instead of the list.
>
> What does "numPartitions" refer to when calling createStream? I read an
> earlier thread that seemed to suggest that numPartitions translates to
> partitions created on the Spark side?
> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
>
> Actually, I re-tried with 64 numPartitions in createStream and that didn't
> work. I will manually set "repartition" to 64/128 and see how that goes.
>
> Thanks.
>
>
>
>
> On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>>
>> Having 16 partitions in KafkaUtils.createStream does not translate to the
>> RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
>> best way to distribute the received data between all the nodes, as long as
>> there are sufficient number of partitions (try setting it to 2x the number
>> cores given to the application).
>>
>> Yeah, in 1.0.0, ttl should be unnecessary.
>>
>>
>>
>> On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith <secs...@gmail.com> wrote:
>>>
>>> On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das
>>> <tathagata.das1...@gmail.com> wrote:
>>>>
>>>> If you are repartitioning to 8 partitions, and your node happen to have
>>>> at least 4 cores each, its possible that all 8 partitions are assigned to
>>>> only 2 nodes. Try increasing the number of partitions. Also make sure you
>>>> have executors (allocated by YARN) running on more than two nodes if you
>>>> want to use all 11 nodes in your yarn cluster.
>>>
>>>
>>> If you look at the code, I commented out the manual re-partitioning to 8.
>>> Instead, I am created 16 partitions when I call createStream. But I will
>>> increase the partitions to, say, 64 and see if I get better parallelism.
>>>
>>>>
>>>>
>>>> If you are using Spark 1.x, then you dont need to set the ttl for
>>>> running Spark Streaming. In case you are using older version, why do you
>>>> want to reduce it? You could reduce it, but it does increase the risk of 
>>>> the
>>>> premature cleaning, if once in a while things get delayed by 20 seconds. I
>>>> dont see much harm in keeping the ttl at 60 seconds (a bit of extra garbage
>>>> shouldnt hurt performance).
>>>>
>>>
>>> I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right,
>>> unless I have memory issues, more aggressive pruning won't help.
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>>
>>>
>>>>
>>>> TD
>>>>
>>>>
>>>> On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> In my streaming app, I receive from kafka where I have tried setting
>>>>> the partitions when calling "createStream" or later, by calling 
>>>>> repartition
>>>>> - in both cases, the number of nodes running the tasks seems to be
>>>>> stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping 
>>>>> to
>>>>> use more nodes.
>>>>>
>>>>> I am starting the job as:
>>>>> nohup spark-submit --class logStreamNormalizer --master yarn
>>>>> log-stream-normalizer_2.10-1.0.jar --jars
>>>>> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
>>>>> --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
>>>>> --num-executors 8 >normRunLog-6.log 2>normRunLogError-6.log & echo $! >
>>>>> run-6.pid
>>>>>
>>>>> My main code is:
>>>>>  val sparkConf = new SparkConf().setAppName("SparkKafkaTest")
>>>>>  val ssc = new StreamingContext(sparkConf,Seconds(5))
>>>>>  val kInMsg =
>>>>> KafkaUtils.createStream(ssc,"node-nn1-1:2181/zk_kafka","normApp",Map("rawunstruct"
>>>>> -> 16))
>>>>>
>>>>>  val propsMap = Map("metadata.broker.list" ->
>>>>> "node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092", "serializer.class" ->
>>>>> "kafka.serializer.StringEncoder", "producer.type" -> "async",
>>>>> "request.required.acks" -> "1")
>>>>>  val to_topic = """normStruct"""
>>>>>  val writer = new KafkaOutputService(to_topic, propsMap)
>>>>>
>>>>>
>>>>>  if (!configMap.keySet.isEmpty)
>>>>>  {
>>>>>   //kInMsg.repartition(8)
>>>>>   val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
>>>>>   outdata.foreachRDD((rdd,time) => { rdd.foreach(rec => {
>>>>> writer.output(rec) }) } )
>>>>>  }
>>>>>
>>>>>  ssc.start()
>>>>>  ssc.awaitTermination()
>>>>>
>>>>> In terms of total delay, with a 5 second batch, the delays usually stay
>>>>> under 5 seconds, but sometimes jump to ~10 seconds. As a performance 
>>>>> tuning
>>>>> question, does this mean, I can reduce my cleaner ttl from 60 to say 25
>>>>> (still more than double of the peak delay)?
>>>>>
>>>>> Thanks
>>>>>
>>>>> Tim
>>>>>
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to