Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-05-14 Thread Cody Koeninger
Sorry, realized I probably didn't fully answer your question about my blog post, as opposed to Michael Nolls. The direct stream is really blunt, a given RDD partition is just a kafka topic/partition and an upper / lower bound for the range of offsets. When an executor computes the partition, it c

Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-05-14 Thread Cody Koeninger
If the transformation you're trying to do really is per-partition, it shouldn't matter whether you're using scala methods or spark methods. The parallel speedup you're getting is all from doing the work on multiple machines, and shuffle or caching or other benefits of spark aren't a factor. If us

Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-05-14 Thread will-ob
Hey Cody (et. al.), Few more questions related to this. It sounds like our missing data issues appear fixed with this approach. Could you shed some light on a few questions that came up? - Processing our data inside a single foreachPartition function appears to be very differ

Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-05-05 Thread Cody Koeninger
Glad that worked out for you. I updated the post on my github to hopefully clarify the issue. On Tue, May 5, 2015 at 9:36 AM, Mark Stewart wrote: > In case anyone else was having similar issues, the reordering and dropping > of the reduceByKey solved the issues we were having. Thank you kindly,

Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-05-05 Thread Mark Stewart
In case anyone else was having similar issues, the reordering and dropping of the reduceByKey solved the issues we were having. Thank you kindly, Mr. Koeninger. On Thu, Apr 30, 2015 at 3:06 PM, Cody Koeninger wrote: > In fact, you're using the 2 arg form of reduce by key to shrink it down to > 1

Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-04-30 Thread badgerpants
Cody Koeninger-2 wrote > In fact, you're using the 2 arg form of reduce by key to shrink it down to > 1 partition > > reduceByKey(sumFunc, 1) > > But you started with 4 kafka partitions? So they're definitely no longer > 1:1 True. I added the second arg because we were seeing multiple threads

Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-04-30 Thread Cody Koeninger
In fact, you're using the 2 arg form of reduce by key to shrink it down to 1 partition reduceByKey(sumFunc, 1) But you started with 4 kafka partitions? So they're definitely no longer 1:1 On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger wrote: > This is what I'm suggesting, in pseudocode > >

Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-04-30 Thread Cody Koeninger
This is what I'm suggesting, in pseudocode rdd.mapPartitionsWithIndex { case (i, iter) => offset = offsets(i) result = yourReductionFunction(iter) transaction { save(result) save(offset) } }.foreach { (_: Nothing) => () } where yourReductionFunction is just normal scala co

Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-04-30 Thread badgerpants
Cody Koeninger-2 wrote > What's your schema for the offset table, and what's the definition of > writeOffset ? The schema is the same as the one in your post: topic | partition| offset The writeOffset is nearly identical: def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = {

Re: practical usage of the new "exactly-once" supporting DirectKafkaInputDStream

2015-04-30 Thread Cody Koeninger
What's your schema for the offset table, and what's the definition of writeOffset ? What key are you reducing on? Maybe I'm misreading the code, but it looks like the per-partition offset is part of the key. If that's true then you could just do your reduction on each partition, rather than afte