Hi Rakesh, Thanks for suggestion. Each block of original matrix is in separate partition. Each block of transposed matrix is also in a separate partition. The partition numbers are the same for the blocks that undergo multiplication. Each partition is on a separate worker. Basically, I want to force each worker to multiply only 2 blocks. This should be the optimal configuration for multiplication, as far as I understand. Having several blocks in each partition as you suggested is not optimal, is it?
Best regards, Alexander Block matrix stores the data as key->Matrix pairs and multiply does a reduceByKey operations, aggregating matrices per key. Since you said each block is residing in a separate partition, reduceByKey might be effectively shuffling all of the data. A better way to go about this is to allow multiple blocks within each partition so that reduceByKey does a local reduce before aggregating across nodes. Rakesh On Mon, Jul 13, 2015 at 9:24 PM Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: Dear Spark developers, I am trying to perform BlockMatrix multiplication in Spark. My test is as follows: 1)create a matrix of N blocks, so that each row of block matrix contains only 1 block and each block resides in separate partition on separate node, 2)transpose the block matrix and 3)multiply the transposed matrix by the original non-transposed one. This should preserve the data locality, so there should be no need for shuffle. However, I observe huge shuffle with the block matrix size of 50000x10000 and one block 10000x10000, 5 blocks per matrix. Could you suggest what is wrong? My setup is Spark 1.4, one master and 5 worker nodes, each is Xeon 2.2 16 GB RAM. Below is the test code: import org.apache.spark.mllib.linalg.Matrices import org.apache.spark.mllib.linalg.distributed.BlockMatrix val parallelism = 5 val blockSize = 10000 val rows = parallelism * blockSize val columns = blockSize val size = rows * columns assert(rows % blockSize == 0) assert(columns % blockSize == 0) val rowBlocks = rows / blockSize val columnBlocks = columns / blockSize val rdd = sc.parallelize( { for(i <- 0 until rowBlocks; j <- 0 until columnBlocks) yield (i, j) }, parallelism).map( coord => (coord, Matrices.rand(blockSize, blockSize, util.Random.self))) val bm = new BlockMatrix(rdd, blockSize, blockSize).cache() bm.validate() val mb = bm.transpose.cache() mb.validate() val t = System.nanoTime() val ata = mb.multiply(bm) ata.validate() println(rows + "x" + columns + ", block:" + blockSize + "\t" + (System.nanoTime() - t) / 1e9) Best regards, Alexander --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org