I am trying to multiply against a large matrix that is stored in parquet format, so am being careful not to store the RDD in memory, but am getting an OOM error from the parquet reader: 15/12/06 05:23:36 WARN TaskSetManager: Lost task 950.0 in stage 4.0 (TID 28398, 172.31.34.233): java.lang.OutOfMemoryError: Java heap space at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:755) at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:494) at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:127) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208) at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) ...
Specifically, the matrix is a 46752-by-54843120 dense matrix of 32-bit floats that is stored in parquet format (each row is about 1.7GB uncompressed). The following code loads this matrix as a Spark IndexedRowMatrix and multiplies it by a random vector (the rows are stored with an associated string label, and the floats have to be converted to doubles because IndexedRows can only use doubles): val rows = { sqlContext.read.parquet(datafname).rdd.map { case SQLRow(rowname: String, values: WrappedArray[Float]) => // DenseVectors have to be doubles val vector = new DenseVector(values.toArray.map(v => v.toDouble)) new IndexedRow(indexLUT(rowname), vector) } } val nrows : Long = 46752 val ncols = 54843120 val A = new IndexedRowMatrix(rows, nrows, ncols) A.rows.unpersist() // doesn't help avoid OOM val x = new DenseMatrix(ncols, 1, BDV.rand(ncols).data) A.multiply(x).rows.collect I am using the following options when running --driver-memory 220G --num-executors 203 --executor-cores 4 --executor-memory 25G --conf spark.storage.memoryFraction=0 There are 25573 partitions to the parquet file, so the uncompressed Float values of each partition should be less than 4Gb; I expect this should imply that the current executor memory is much more than sufficient (I cannot raise the executor-memory setting). Any ideas why this is running into OOM errors and how to fix it? The only thought I've had is that it could be related to the fact that there are only about 4.5K physical part- files for the parquet dataset, but Spark partitions it into 25K partitions when loading the dataframe, so some rows must be being distributed across partitions on different executors, so maybe it is caching portions of rows to aid with shuffling ... if this is the case, any suggestions on how to ameliorate this situation? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-runs-out-of-memory-when-reading-in-a-huge-matrix-tp25590.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org