I am trying to load data from csv format into parquet using Spark SQL.
It consistently runs out of memory.

The environment is:
        * standalone cluster using HDFS and Hive metastore from HDP2.0

        * spark1.1.0

        * parquet jar files (v1.5) explicitly added when starting spark-sql.

        * 20 workers - ec2 r3.large - set with SPARK_DAEMON_MEMORY of 10g

        * 1 master - ec2 r3xlarge


The input is split across 12 files:
hdfs dfs -ls /tpcds/fcsv/catalog_returns
Found 12 items
-rw-r--r--   3 spark hdfs  282305091 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000000_0
-rw-r--r--   3 spark hdfs  282037998 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000001_0
-rw-r--r--   3 spark hdfs  276284419 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000002_0
-rw-r--r--   3 spark hdfs  269675703 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000003_0
-rw-r--r--   3 spark hdfs  269673166 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000004_0
-rw-r--r--   3 spark hdfs  269678197 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000005_0
-rw-r--r--   3 spark hdfs  153478133 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000006_0
-rw-r--r--   3 spark hdfs  147586385 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000007_0
-rw-r--r--   3 spark hdfs  147542545 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000008_0
-rw-r--r--   3 spark hdfs  141161085 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000009_0
-rw-r--r--   3 spark hdfs   12110104 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000010_0
-rw-r--r--   3 spark hdfs    6374442 2014-09-22 11:31 
/tpcds/fcsv/catalog_returns/000011_0

The failure stack from spark-sql is this:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 
1, localhost): java.lang.OutOfMemoryError: Java heap space
        
parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:97)
        
parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:124)
        
parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:146)
        
parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:308)
        parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:233)
        
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:84)
        
parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119)
        
parquet.column.impl.ColumnWriterImpl.accountForValueWritten(ColumnWriterImpl.java:108)
        parquet.column.impl.ColumnWriterImpl.write(ColumnWriterImpl.java:148)
        
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:306)
        
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:133)
        
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeData(DataWritableWriter.java:75)
        
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:55)
        
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
        
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
        
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
        parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
        parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
        
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:77)
        
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:90)
        
org.apache.spark.sql.hive.SparkHiveHadoopWriter.write(SparkHadoopWriter.scala:98)
        
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:151)
        
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
        
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)

Am I missing something? Is this a case of "wrong tool for the job"? 

Regards,
dd

Reply via email to