The Parquet output writer allocates one block for each table partition it is processing and writes partitions in parallel. It will run out of memory if (number of partitions) times (Parquet block size) is greater than the available memory. You can try to decrease the number of partitions. And could you share the value of "parquet.block.size" and your available memory?
2015-09-05 18:59 GMT+08:00 Yana Kadiyska <yana.kadiy...@gmail.com>: > Hi folks, I have a strange issue. Trying to read a 7G file and do failry > simple stuff with it: > > I can read the file/do simple operations on it. However, I'd prefer to > increase the number of partitions in preparation for more memory-intensive > operations (I'm happy to wait, I just need the job to complete). > Repartition seems to cause an OOM for me? > Could someone shed light/or speculate/ why this would happen -- I thought > we repartition higher to relieve memory pressure? > > Im using Spark1.4.1 CDH4 if that makes a difference > > This works > > val res2 = sqlContext.parquetFile(lst:_*).where($"customer_id"===lit(254)) > res2.count > res1: Long = 77885925 > > scala> res2.explain > == Physical Plan == > Filter (customer_id#314 = 254) > PhysicalRDD [....4], MapPartitionsRDD[11] at > > scala> res2.rdd.partitions.size > res3: Int = 59 > > > > > This doesnt: > > scala> res2.repartition(60).count > [Stage 2:> (1 + 45) / > 59]15/09/05 10:17:21 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID 62, > fqdn): java.lang.OutOfMemoryError: Java heap space > at > parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:729) > at > parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:490) > at > parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116) > at > parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) > at > parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) > at > org.apache.spark.sql.sources.SqlNewHadoopRDD$anon$1.hasNext(SqlNewHadoopRDD.scala:163) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388) > at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:207) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > >