I would hope that things should work for this kind of workflow. I'm curious if you have tried using saveAsParquetFile instead of inserting directly into a hive table (you could still register this as an external table afterwards). Right now inserting into Hive tables is going to through their SerDe instead of our native parquet code, so we have less control over what is happening. If you go down the saveAsParquetFile route you might try repartition to increase the number of partitions (and thus decrease the amount of data buffered per partition).
On Tue, Sep 23, 2014 at 1:36 PM, Dan Dietterich < dan_dietter...@yahoo.com.invalid> wrote: > I am trying to load data from csv format into parquet using Spark SQL. > It consistently runs out of memory. > > The environment is: > > - standalone cluster using HDFS and Hive metastore from HDP2.0 > - spark1.1.0 > - parquet jar files (v1.5) explicitly added when starting spark-sql. > - 20 workers - ec2 r3.large - set with SPARK_DAEMON_MEMORY of 10g > - 1 master - ec2 r3xlarge > > > The input is split across 12 files: > hdfs dfs -ls /tpcds/fcsv/catalog_returns > Found 12 items > -rw-r--r-- 3 spark hdfs 282305091 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000000_0 > -rw-r--r-- 3 spark hdfs 282037998 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000001_0 > -rw-r--r-- 3 spark hdfs 276284419 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000002_0 > -rw-r--r-- 3 spark hdfs 269675703 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000003_0 > -rw-r--r-- 3 spark hdfs 269673166 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000004_0 > -rw-r--r-- 3 spark hdfs 269678197 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000005_0 > -rw-r--r-- 3 spark hdfs 153478133 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000006_0 > -rw-r--r-- 3 spark hdfs 147586385 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000007_0 > -rw-r--r-- 3 spark hdfs 147542545 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000008_0 > -rw-r--r-- 3 spark hdfs 141161085 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000009_0 > -rw-r--r-- 3 spark hdfs 12110104 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000010_0 > -rw-r--r-- 3 spark hdfs 6374442 2014-09-22 11:31 > /tpcds/fcsv/catalog_returns/000011_0 > > The failure stack from spark-sql is this: > org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 > in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage > 0.0 (TID 1, localhost): java.lang.OutOfMemoryError: Java heap space > > parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:97) > > parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:124) > > parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:146) > > parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:308) > > parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:233) > > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:84) > > parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119) > > parquet.column.impl.ColumnWriterImpl.accountForValueWritten(ColumnWriterImpl.java:108) > > parquet.column.impl.ColumnWriterImpl.write(ColumnWriterImpl.java:148) > > parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:306) > > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:133) > > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeData(DataWritableWriter.java:75) > > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:55) > > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59) > > org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31) > > parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) > > parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) > > parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) > > org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:77) > > org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:90) > > org.apache.spark.sql.hive.SparkHiveHadoopWriter.write(SparkHadoopWriter.scala:98) > org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org > $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:151) > > org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) > > org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.Task.run(Task.scala:54) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:745) > > Am I missing something? Is this a case of "wrong tool for the job"? > > Regards, > dd >