Hi,

Thank you for this confirmation.

Coalescing is what we do now. It creates, however, very big partitions.

Guillaume
Hey,

I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now.

You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently.

Eugen

2015-06-18 13:36 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com <mailto:guillaume.pi...@exensa.com>>:

    Hi,

    I'm trying to figure out the smartest way to implement a global
    count-min-sketch on accumulators. For now, we are doing that with RDDs. It
    works well, but with one sketch per partition, merging takes too long.

    As you probably know, a count-min sketch is a big mutable array of array
    of ints. To distribute it, all sketches must have the same size. Since it
    can be big, and since merging is not free, I would like to minimize the
    number of sketches and maximize reuse and conccurent use of the sketches.
    Ideally, I would like to just have one sketch per worker.

    I think accumulables might be the right structures for that, but it seems
    that they are not shared between executors, or even between tasks.

    
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
    (289)
    /**

        * This thread-local map holds per-task copies of accumulators; it is 
used
    to collect the set

        * of accumulator updates to send back to the driver when tasks complete.
    After tasks complete,

        * this map is cleared by `Accumulators.clear()` (see Executor.scala).

        */

        private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, 
_]]]() {

        override protected def initialValue() = Map[Long, Accumulable[_, _]]()

        }


    The localAccums stores an accumulator for each task (it's thread local, so
    I assume each task have a unique thread on executors)

    If I understand correctly, each time a task starts, an accumulator is
    initialized locally, updated, then sent back to the driver for merging ?

    So I guess, accumulators may not be the way to go, finally.

    Any advice ?
    Guillaume
-- eXenSa

        
    *Guillaume PITEL, Président*
    +33(0)626 222 431

    eXenSa S.A.S. <http://www.exensa.com/>
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)184 163 677 / Fax +33(0)972 283 705




--
eXenSa

        
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705

Reply via email to