Sorry, realized I probably didn't fully answer your question about my blog
post, as opposed to Michael Nolls.
The direct stream is really blunt, a given RDD partition is just a kafka
topic/partition and an upper / lower bound for the range of offsets. When
an executor computes the partition, it c
If the transformation you're trying to do really is per-partition, it
shouldn't matter whether you're using scala methods or spark methods. The
parallel speedup you're getting is all from doing the work on multiple
machines, and shuffle or caching or other benefits of spark aren't a factor.
If us
Hey Cody (et. al.),
Few more questions related to this. It sounds like our missing data issues
appear fixed with this approach. Could you shed some light on a few
questions that came up?
-
Processing our data inside a single foreachPartition function appears to be
very differ
Glad that worked out for you. I updated the post on my github to hopefully
clarify the issue.
On Tue, May 5, 2015 at 9:36 AM, Mark Stewart
wrote:
> In case anyone else was having similar issues, the reordering and dropping
> of the reduceByKey solved the issues we were having. Thank you kindly,
In case anyone else was having similar issues, the reordering and dropping
of the reduceByKey solved the issues we were having. Thank you kindly, Mr.
Koeninger.
On Thu, Apr 30, 2015 at 3:06 PM, Cody Koeninger wrote:
> In fact, you're using the 2 arg form of reduce by key to shrink it down to
> 1
Cody Koeninger-2 wrote
> In fact, you're using the 2 arg form of reduce by key to shrink it down to
> 1 partition
>
> reduceByKey(sumFunc, 1)
>
> But you started with 4 kafka partitions? So they're definitely no longer
> 1:1
True. I added the second arg because we were seeing multiple threads
In fact, you're using the 2 arg form of reduce by key to shrink it down to
1 partition
reduceByKey(sumFunc, 1)
But you started with 4 kafka partitions? So they're definitely no longer
1:1
On Thu, Apr 30, 2015 at 1:58 PM, Cody Koeninger wrote:
> This is what I'm suggesting, in pseudocode
>
>
This is what I'm suggesting, in pseudocode
rdd.mapPartitionsWithIndex { case (i, iter) =>
offset = offsets(i)
result = yourReductionFunction(iter)
transaction {
save(result)
save(offset)
}
}.foreach { (_: Nothing) => () }
where yourReductionFunction is just normal scala co
Cody Koeninger-2 wrote
> What's your schema for the offset table, and what's the definition of
> writeOffset ?
The schema is the same as the one in your post: topic | partition| offset
The writeOffset is nearly identical:
def writeOffset(osr: OffsetRange)(implicit session: DBSession): Unit = {
What's your schema for the offset table, and what's the definition of
writeOffset ?
What key are you reducing on? Maybe I'm misreading the code, but it looks
like the per-partition offset is part of the key. If that's true then you
could just do your reduction on each partition, rather than afte
10 matches
Mail list logo