So were you repartitioning with the original job as well?

On Fri, Jun 19, 2015 at 9:36 PM, Tim Smith <secs...@gmail.com> wrote:

> I did try without repartition, initially, but that was even more horrible
> because instead of the allocated 100 executors, only 30 (which is the
> number of kafka partitions) would have to do the work. The "MyFunc" is a
> CPU bound task so adding more memory per executor wouldn't help and I saw
> that each of the 30 executors was only using one thread/core on each Spark
> box. I could go and play with threading in MyFunc but I don't want to mess
> with threading with all the parallelism already involved and I don't think
> in-app threading outside of what the framework does is really desirable.
>
> With repartition, there is shuffle involved, but at least the computation
> load spreads across all 100 executors instead of just 30.
>
>
>
>
> On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> If that's the case, you're still only using as many read executors as
>> there are kafka partitions.
>>
>> I'd remove the repartition. If you weren't doing any shuffles in the old
>> job, and are doing a shuffle in the new job, it's not really comparable.
>>
>> On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith <secs...@gmail.com> wrote:
>>
>>> On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Also, can you find from the spark UI the break up of the stages in each
>>>> batch's jobs, and find which stage is taking more time after a while?
>>>>
>>>
>>> Sure, will try to debug/troubleshoot. Are there enhancements to this
>>> specific API between 1.3 and 1.4 that can substantially change it's
>>> behaviour?
>>>
>>>
>>>> On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> when you say your old version was
>>>>>
>>>>> k = createStream .....
>>>>>
>>>>> were you manually creating multiple receivers?  Because otherwise
>>>>> you're only using one receiver on one executor...
>>>>>
>>>>
>>> Yes, sorry, the earlier/stable version was more like:
>>> kInStreams = (1 to n).map{_ => KafkaUtils.createStream ............ // n
>>> being the number of kafka partitions, 1 receiver per partition
>>> val k = ssc.union(kInStreams)
>>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>>> myOutputFunc.write(rec) })
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>>
>>>
>>>
>>>
>>>>
>>>>> If that's the case I'd try direct stream without the repartitioning.
>>>>>
>>>>>
>>>>> On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>>
>>>>>> Essentially, I went from:
>>>>>> k = createStream .....
>>>>>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>>>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>>>>>> myOutputFunc.write(rec) })
>>>>>>
>>>>>> To:
>>>>>> kIn = createDirectStream .....
>>>>>> k = kIn.repartition(numberOfExecutors) //since #kafka partitions <
>>>>>> #spark-executors
>>>>>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>>>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>>>>>> myOutputFunc.write(rec) })
>>>>>>
>>>>>> With the new API, the app starts up and works fine for a while but I
>>>>>> guess starts to deteriorate after a while. With the existing API
>>>>>> "createStream", the app does deteriorate but over a much longer period,
>>>>>> hours vs days.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das <t...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, please tell us what operation are you using.
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>> On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger <c...@koeninger.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Is there any more info you can provide / relevant code?
>>>>>>>>
>>>>>>>> On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith <secs...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Update on performance of the new API: the new code using the
>>>>>>>>> createDirectStream API ran overnight and when I checked the app state 
>>>>>>>>> in
>>>>>>>>> the morning, there were massive scheduling delays :(
>>>>>>>>>
>>>>>>>>> Not sure why and haven't investigated a whole lot. For now,
>>>>>>>>> switched back to the createStream API build of my app. Yes, for the 
>>>>>>>>> record,
>>>>>>>>> this is with CDH 5.4.1 and Spark 1.3.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith <secs...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for the super-fast response, TD :)
>>>>>>>>>>
>>>>>>>>>> I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
>>>>>>>>>> Cloudera, are you listening? :D
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das <
>>>>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you using Spark 1.3.x ? That explains. This issue has been
>>>>>>>>>>> fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with 
>>>>>>>>>>> more
>>>>>>>>>>> awesome stats. :)
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith <secs...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I just switched from "createStream" to the "createDirectStream"
>>>>>>>>>>>> API for kafka and while things otherwise seem happy, the first 
>>>>>>>>>>>> thing I
>>>>>>>>>>>> noticed is that stream/receiver stats are gone from the Spark UI 
>>>>>>>>>>>> :( Those
>>>>>>>>>>>> stats were very handy for keeping an eye on health of the app.
>>>>>>>>>>>>
>>>>>>>>>>>> What's the best way to re-create those in the Spark UI?
>>>>>>>>>>>> Maintain Accumulators? Would be really nice to get back 
>>>>>>>>>>>> receiver-like stats
>>>>>>>>>>>> even though I understand that "createDirectStream" is a 
>>>>>>>>>>>> receiver-less
>>>>>>>>>>>> design.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>
>>>>>>>>>>>> Tim
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to