Since RDDs aren't designed as random-access maps, and are basically bits of bookkeeping that make sense only on the driver, I think the realization of something like this in Spark would realistically be "collect RDD to local data structure" if anything.
It sounds like you're looking for a distributed cache, and there are frameworks for that that can be used with Spark without Spark rebuilding that too. On Mon, Mar 23, 2015 at 12:00 PM, Guillaume Pitel <guillaume.pi...@exensa.com> wrote: > Not far, but not exactly. The RDD could be too big to fit in memory, > > The idea is more like a worker-side rdd.lookup() with local cache. > > Guillaume > > In a sentence, is this the idea of collecting an RDD to memory on each > executor directly? > > On Sun, Mar 22, 2015 at 10:56 PM, Sandy Ryza <sandy.r...@cloudera.com> > wrote: > > Hi Guillaume, > > I've long thought something like this would be useful - i.e. the ability to > broadcast RDDs directly without first pulling data through the driver. If I > understand correctly, your requirement to "block" a matrix up and only fetch > the needed parts could be implemented on top of this by splitting an RDD > into a set of smaller RDDs and then broadcasting each one on its own. > > Unfortunately nobody is working on this currently (and I couldn't promise to > have bandwidth to review it at the moment either), but I suspect we'll > eventually need to add something like this for map joins in Hive on Spark > and Spark SQL. > > -Sandy > > > > On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel > <guillaume.pi...@exensa.com> wrote: > > Hi, > > Thanks for your answer. This is precisely the use case I'm interested in, > but I know it already, I should have mentionned it. Unfortunately this > implementation of BlockMatrix has (in my opinion) some disadvantages (the > fact that it split the matrix by range instead of using a modulo is bad for > block skewness). Besides, and more importantly, as I was writing, it uses > the join solution (actually a cogroup : > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala, > line 361). The reduplication of the elements of the dense matrix is thus > dependent on the block size. > > Actually I'm wondering if what I want to achieve could be made with a > simple modification to the join, allowing a partition to be weakly cached > wafter being retrieved. > > Guillaume > > > There is block matrix in Spark 1.3 - > http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix > > > > > > However I believe it only supports dense matrix blocks. > > > > > Still, might be possible to use it or exetend > > > > > JIRAs: > > > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434 > > > > > > Was based on > > > https://github.com/amplab/ml-matrix > > > > > > Another lib: > > > https://github.com/PasaLab/marlin/blob/master/README.md > > > > > > > > — > Sent from Mailbox > > On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel > <guillaume.pi...@exensa.com> wrote: > > Hi, > I have an idea that I would like to discuss with the Spark devs. The > idea comes from a very real problem that I have struggled with since > almost a year. My problem is very simple, it's a dense matrix * sparse > matrix operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is > divided in X large blocks (one block per partition), and a sparse matrix > RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The > most efficient way to perform the operation is to collectAsMap() the > dense matrix and broadcast it, then perform the block-local > mutliplications, and combine the results by column. > This is quite fine, unless the matrix is too big to fit in memory > (especially since the multiplication is performed several times > iteratively, and the broadcasts are not always cleaned from memory as I > would naively expect). > When the dense matrix is too big, a second solution is to split the big > sparse matrix in several RDD, and do several broadcasts. Doing this > creates quite a big overhead, but it mostly works, even though I often > face some problems with unaccessible broadcast files, for instance. > Then there is the terrible but apparently very effective good old join. > Since X blocks of the sparse matrix use the same block from the dense > matrix, I suspect that the dense matrix is somehow replicated X times > (either on disk or in the network), which is the reason why the join > takes so much time. > After this bit of a context, here is my idea : would it be possible to > somehow "broadcast" (or maybe more accurately, share or serve) a > persisted RDD which is distributed on all workers, in a way that would, > a bit like the IndexedRDD, allow a task to access a partition or an > element of a partition in the closure, with a worker-local memory cache > . i.e. the information about where each block resides would be > distributed on the workers, to allow them to access parts of the RDD > directly. I think that's already a bit how RDD are shuffled ? > The RDD could stay distributed (no need to collect then broadcast), and > only necessary transfers would be required. > Is this a bad idea, is it already implemented somewhere (I would love it > !) ?or is it something that could add efficiency not only for my use > case, but maybe for others ? Could someone give me some hint about how I > could add this possibility to Spark ? I would probably try to extend a > RDD into a specific SharedIndexedRDD with a special lookup that would be > allowed from tasks as a special case, and that would try to contact the > blockManager and reach the corresponding data from the right worker. > Thanks in advance for your advices > Guillaume > -- > eXenSa > > *Guillaume PITEL, Président* > +33(0)626 222 431 > eXenSa S.A.S. <http://www.exensa.com/> > 41, rue Périer - 92120 Montrouge - FRANCE > Tel +33(0)184 163 677 / Fax +33(0)972 283 705 > > > > -- > Guillaume PITEL, Président > +33(0)626 222 431 > > eXenSa S.A.S. > 41, rue Périer - 92120 Montrouge - FRANCE > Tel +33(0)184 163 677 / Fax +33(0)972 283 705 > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > > > > -- > Guillaume PITEL, Président > +33(0)626 222 431 > > eXenSa S.A.S. > 41, rue Périer - 92120 Montrouge - FRANCE > Tel +33(0)184 163 677 / Fax +33(0)972 283 705 --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org