Not far, but not exactly. The RDD could be too big to fit in memory,

The idea is more like a worker-side rdd.lookup() with local cache.

Guillaume
In a sentence, is this the idea of collecting an RDD to memory on each
executor directly?

On Sun, Mar 22, 2015 at 10:56 PM, Sandy Ryza <sandy.r...@cloudera.com> wrote:
Hi Guillaume,

I've long thought something like this would be useful - i.e. the ability to
broadcast RDDs directly without first pulling data through the driver.  If I
understand correctly, your requirement to "block" a matrix up and only fetch
the needed parts could be implemented on top of this by splitting an RDD
into a set of smaller RDDs and then broadcasting each one on its own.

Unfortunately nobody is working on this currently (and I couldn't promise to
have bandwidth to review it at the moment either), but I suspect we'll
eventually need to add something like this for map joins in Hive on Spark
and Spark SQL.

-Sandy



On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel
<guillaume.pi...@exensa.com> wrote:
Hi,

Thanks for your answer. This is precisely the use case I'm interested in,
but I know it already, I should have mentionned it. Unfortunately this
implementation of BlockMatrix has (in my opinion) some disadvantages (the
fact that it split the matrix by range instead of using a modulo is bad for
block skewness). Besides, and more importantly, as I was writing, it uses
the join solution (actually a cogroup :
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala,
line 361). The reduplication of the elements of the dense matrix is thus
dependent on the block size.

Actually I'm wondering if what I want to achieve could be made with a
simple modification to the join, allowing a partition to be weakly cached
wafter being retrieved.

Guillaume


There is block matrix in Spark 1.3 -
http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix





However I believe it only supports dense matrix blocks.




Still, might be possible to use it or exetend




JIRAs:


https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434





Was based on


https://github.com/amplab/ml-matrix





Another lib:


https://github.com/PasaLab/marlin/blob/master/README.md







—
Sent from Mailbox

On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel
<guillaume.pi...@exensa.com> wrote:

Hi,
I have an idea that I would like to discuss with the Spark devs. The
idea comes from a very real problem that I have struggled with since
almost a year. My problem is very simple, it's a dense matrix * sparse
matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
divided in X large blocks (one block per partition), and a sparse matrix
RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
most efficient way to perform the operation is to collectAsMap() the
dense matrix and broadcast it, then perform the block-local
mutliplications, and combine the results by column.
This is quite fine, unless the matrix is too big to fit in memory
(especially since the multiplication is performed several times
iteratively, and the broadcasts are not always cleaned from memory as I
would naively expect).
When the dense matrix is too big, a second solution is to split the big
sparse matrix in several RDD, and do several broadcasts. Doing this
creates quite a big overhead, but it mostly works, even though I often
face some problems with unaccessible broadcast files, for instance.
Then there is the terrible but apparently very effective good old join.
Since X blocks of the sparse matrix use the same block from the dense
matrix, I suspect that the dense matrix is somehow replicated X times
(either on disk or in the network), which is the reason why the join
takes so much time.
After this bit of a context, here is my idea : would it be possible to
somehow "broadcast" (or maybe more accurately, share or serve) a
persisted RDD which is distributed on all workers, in a way that would,
a bit like the IndexedRDD, allow a task to access a partition or an
element of a partition in the closure, with a worker-local memory cache
. i.e. the information about where each block resides would be
distributed on the workers, to allow them to access parts of the RDD
directly. I think that's already a bit how RDD are shuffled ?
The RDD could stay distributed (no need to collect then broadcast), and
only necessary transfers would be required.
Is this a bad idea, is it already implemented somewhere (I would love it
!) ?or is it something that could add efficiency not only for my use
case, but maybe for others ? Could someone give me some hint about how I
could add this possibility to Spark ? I would probably try to extend a
RDD into a specific SharedIndexedRDD with a special lookup that would be
allowed from tasks as a special case, and that would try to contact the
blockManager and reach the corresponding data from the right worker.
Thanks in advance for your advices
Guillaume
--
eXenSa
        
*Guillaume PITEL, Président*
+33(0)626 222 431
eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



--
Guillaume PITEL, Président
+33(0)626 222 431

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



--
eXenSa

        
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705

Reply via email to