I am trying to multiply against a large matrix that is stored in parquet
format, so am being careful not to store the RDD in memory, but am getting
an OOM error from the parquet reader:
 
    15/12/06 05:23:36 WARN TaskSetManager: Lost task 950.0 in stage 4.0   
    (TID 28398, 172.31.34.233): java.lang.OutOfMemoryError: Java heap space
    at
org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:755)
    at
org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:494)
    at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:127)
    at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
    at
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
    ...

Specifically, the matrix is a 46752-by-54843120 dense matrix of 32-bit
floats that is stored in parquet format (each row is about 1.7GB
uncompressed). 

The following code loads this matrix as a Spark IndexedRowMatrix and
multiplies it by a random vector (the rows are stored with an associated
string label, and the floats have to be converted to doubles because
IndexedRows can only use doubles):

    val rows = {
      sqlContext.read.parquet(datafname).rdd.map {
        case SQLRow(rowname: String, values: WrappedArray[Float]) =>
        // DenseVectors have to be doubles
          val vector = new DenseVector(values.toArray.map(v => v.toDouble)) 
          new IndexedRow(indexLUT(rowname), vector)
        }
    }

    val nrows : Long = 46752
    val ncols = 54843120
    val A = new IndexedRowMatrix(rows, nrows, ncols)
    A.rows.unpersist() // doesn't help avoid OOM

    val x = new DenseMatrix(ncols, 1, BDV.rand(ncols).data)
    A.multiply(x).rows.collect

I am using the following options when running

    --driver-memory 220G
    --num-executors 203
    --executor-cores 4
    --executor-memory 25G
    --conf spark.storage.memoryFraction=0

There are 25573 partitions to the parquet file, so the uncompressed Float
values of each partition should be less than 4Gb; I expect this should imply
that the current executor memory is much more than sufficient (I cannot
raise the executor-memory setting).

Any ideas why this is running into OOM errors and how to fix it?  The only
thought I've had is that it could be related to the fact that there are only
about 4.5K physical part- files for the parquet dataset, but Spark
partitions it into 25K partitions when loading the dataframe, so some rows
must be being distributed across partitions on different executors, so maybe
it is caching portions of rows to aid with shuffling ... if this is the
case, any suggestions on how to ameliorate this situation?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-runs-out-of-memory-when-reading-in-a-huge-matrix-tp25590.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to