So were you repartitioning with the original job as well? On Fri, Jun 19, 2015 at 9:36 PM, Tim Smith <secs...@gmail.com> wrote:
> I did try without repartition, initially, but that was even more horrible > because instead of the allocated 100 executors, only 30 (which is the > number of kafka partitions) would have to do the work. The "MyFunc" is a > CPU bound task so adding more memory per executor wouldn't help and I saw > that each of the 30 executors was only using one thread/core on each Spark > box. I could go and play with threading in MyFunc but I don't want to mess > with threading with all the parallelism already involved and I don't think > in-app threading outside of what the framework does is really desirable. > > With repartition, there is shuffle involved, but at least the computation > load spreads across all 100 executors instead of just 30. > > > > > On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> If that's the case, you're still only using as many read executors as >> there are kafka partitions. >> >> I'd remove the repartition. If you weren't doing any shuffles in the old >> job, and are doing a shuffle in the new job, it's not really comparable. >> >> On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith <secs...@gmail.com> wrote: >> >>> On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Also, can you find from the spark UI the break up of the stages in each >>>> batch's jobs, and find which stage is taking more time after a while? >>>> >>> >>> Sure, will try to debug/troubleshoot. Are there enhancements to this >>> specific API between 1.3 and 1.4 that can substantially change it's >>> behaviour? >>> >>> >>>> On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> when you say your old version was >>>>> >>>>> k = createStream ..... >>>>> >>>>> were you manually creating multiple receivers? Because otherwise >>>>> you're only using one receiver on one executor... >>>>> >>>> >>> Yes, sorry, the earlier/stable version was more like: >>> kInStreams = (1 to n).map{_ => KafkaUtils.createStream ............ // n >>> being the number of kafka partitions, 1 receiver per partition >>> val k = ssc.union(kInStreams) >>> val dataout = k.map(x=>myFunc(x._2,someParams)) >>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => { >>> myOutputFunc.write(rec) }) >>> >>> Thanks, >>> >>> Tim >>> >>> >>> >>> >>> >>>> >>>>> If that's the case I'd try direct stream without the repartitioning. >>>>> >>>>> >>>>> On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith <secs...@gmail.com> wrote: >>>>> >>>>>> Essentially, I went from: >>>>>> k = createStream ..... >>>>>> val dataout = k.map(x=>myFunc(x._2,someParams)) >>>>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => { >>>>>> myOutputFunc.write(rec) }) >>>>>> >>>>>> To: >>>>>> kIn = createDirectStream ..... >>>>>> k = kIn.repartition(numberOfExecutors) //since #kafka partitions < >>>>>> #spark-executors >>>>>> val dataout = k.map(x=>myFunc(x._2,someParams)) >>>>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => { >>>>>> myOutputFunc.write(rec) }) >>>>>> >>>>>> With the new API, the app starts up and works fine for a while but I >>>>>> guess starts to deteriorate after a while. With the existing API >>>>>> "createStream", the app does deteriorate but over a much longer period, >>>>>> hours vs days. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das <t...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Yes, please tell us what operation are you using. >>>>>>> >>>>>>> TD >>>>>>> >>>>>>> On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger <c...@koeninger.org >>>>>>> > wrote: >>>>>>> >>>>>>>> Is there any more info you can provide / relevant code? >>>>>>>> >>>>>>>> On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith <secs...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Update on performance of the new API: the new code using the >>>>>>>>> createDirectStream API ran overnight and when I checked the app state >>>>>>>>> in >>>>>>>>> the morning, there were massive scheduling delays :( >>>>>>>>> >>>>>>>>> Not sure why and haven't investigated a whole lot. For now, >>>>>>>>> switched back to the createStream API build of my app. Yes, for the >>>>>>>>> record, >>>>>>>>> this is with CDH 5.4.1 and Spark 1.3. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith <secs...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks for the super-fast response, TD :) >>>>>>>>>> >>>>>>>>>> I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. >>>>>>>>>> Cloudera, are you listening? :D >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das < >>>>>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Are you using Spark 1.3.x ? That explains. This issue has been >>>>>>>>>>> fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with >>>>>>>>>>> more >>>>>>>>>>> awesome stats. :) >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith <secs...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I just switched from "createStream" to the "createDirectStream" >>>>>>>>>>>> API for kafka and while things otherwise seem happy, the first >>>>>>>>>>>> thing I >>>>>>>>>>>> noticed is that stream/receiver stats are gone from the Spark UI >>>>>>>>>>>> :( Those >>>>>>>>>>>> stats were very handy for keeping an eye on health of the app. >>>>>>>>>>>> >>>>>>>>>>>> What's the best way to re-create those in the Spark UI? >>>>>>>>>>>> Maintain Accumulators? Would be really nice to get back >>>>>>>>>>>> receiver-like stats >>>>>>>>>>>> even though I understand that "createDirectStream" is a >>>>>>>>>>>> receiver-less >>>>>>>>>>>> design. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>>>>>> Tim >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >