Hi Vinti

I use the stand alone cluster. I the mgmt console provides a link to an app
UI. It has all sorts of performance info. There should be a tab ¹stages¹.
You can use it to find bottlenecks

Note the links in the mgmt console do not seem to work. The app UI runs on
the same machine as the drivers. The first app will run on port 4040, the
second on 4041, Š

Good luck

andy

From:  Vinti Maheshwari <vinti.u...@gmail.com>
Date:  Monday, March 7, 2016 at 3:01 PM
To:  "user @spark" <user@spark.apache.org>
Subject:  Re: Spark Streaming, very slow processing and increasing
scheduling delay of kafka input stream

> Adding one more screenshots of running spark jobs, it's showing lots of
> skipped tasks. I am not sure why it's showing so many skipped tasks.
> 
> Regards
> ~Vinti
> 
> On Mon, Mar 7, 2016 at 2:52 PM, Vinti Maheshwari <vinti.u...@gmail.com> wrote:
>> Hi,
>> 
>> My spark-streaming program seems very slow. I am using Ambari for cluster
>> setup and i am using Kafka for data input.
>> I tried to use batch size 2 secs and check pointing duration 10 secs. But as
>> i was seeing scheduling delay was keep increasing so i tried increasing the
>> batch size to 5 and then 10 secs. But it seems noting changed in respect of
>> performance.
>> 
>> My program is doing two tasks:
>> 
>> 1) Data aggregation
>> 
>> 2) Data insertion into Hbase
>> 
>> Action which took maximum time, when i called foreachRDD on Dstream object
>> (state). 
>> state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>> 
>> 
>> Program sample input coming from kafka:
>> test_id, file1, 1,1,1,1,1
>> 
>> Code snippets:
>> val parsedStream = inputStream
>>   .map(line => {
>>     val splitLines = line.split(",")
>>     (splitLines(1), splitLines.slice(2,
>> splitLines.length).map((_.trim.toLong)))
>>   })      
>> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>>         (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>>           prev.map(_ +: current).orElse(Some(current))
>>             .flatMap(as => Try(as.map(BDV(_)).reduce(_ +
>> _).toArray).toOption)
>> })
>> state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>> 
>> 
>> 
>> object Blaher {
>>   def blah(tup: (String, Array[Long])) {
>>     val hConf = HBaseConfiguration.create()
>>     ------
>>     val hTable = new HTable(hConf, tableName)
>>     val thePut = new Put(Bytes.toBytes("file_data"))
>>     thePut.add(Bytes.toBytes("file_counts"), Bytes.toBytes(tup._1),
>> Bytes.toBytes(tup._2.toList.toString))
>>     new ImmutableBytesWritable(Bytes.toBytes("file_data"))
>> 
>>     hTable.put(thePut)
>>   }
>> }
>> 
>> 
>> My Cluster Specifications:
>> 16 executors ( 1 core each and 2g memory)
>> I have attached some screenshots of running execution.
>> 
>> Anyone has idea what changes should i do to speedup the processing?
>> 
>> Thanks & Regards,
>> Vinti
> 
> --------------------------------------------------------------------- To
> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org


Reply via email to