Hi,
Thank you for this confirmation.
Coalescing is what we do now. It creates, however, very big partitions.
Guillaume
Hey,
I am not 100% sure but from my understanding accumulators are per partition
(so per task as its the same) and are sent back to the driver with the task
result and merged. When a task needs to be run n times (multiple rdds depend
on this one, some partition loss later in the chain etc) then the accumulator
will count n times the values from that task.
So in short I don't think you'd win from using an accumulator over what you
are doing right now.
You could maybe coalesce your rdd to num-executors without a shuffle and then
update the sketches. You should endup with 1 partition per executor thus 1
sketch per executor. You could then increase the number of threads per task if
you can use the sketches concurrently.
Eugen
2015-06-18 13:36 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com
<mailto:guillaume.pi...@exensa.com>>:
Hi,
I'm trying to figure out the smartest way to implement a global
count-min-sketch on accumulators. For now, we are doing that with RDDs. It
works well, but with one sketch per partition, merging takes too long.
As you probably know, a count-min sketch is a big mutable array of array
of ints. To distribute it, all sketches must have the same size. Since it
can be big, and since merging is not free, I would like to minimize the
number of sketches and maximize reuse and conccurent use of the sketches.
Ideally, I would like to just have one sketch per worker.
I think accumulables might be the right structures for that, but it seems
that they are not shared between executors, or even between tasks.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
(289)
/**
* This thread-local map holds per-task copies of accumulators; it is
used
to collect the set
* of accumulator updates to send back to the driver when tasks complete.
After tasks complete,
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
*/
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
_]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
The localAccums stores an accumulator for each task (it's thread local, so
I assume each task have a unique thread on executors)
If I understand correctly, each time a task starts, an accumulator is
initialized locally, updated, then sent back to the driver for merging ?
So I guess, accumulators may not be the way to go, finally.
Any advice ?
Guillaume
--
eXenSa
*Guillaume PITEL, Président*
+33(0)626 222 431
eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705
--
eXenSa
*Guillaume PITEL, Président*
+33(0)626 222 431
eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705