I would hope that things should work for this kind of workflow.

I'm curious if you have tried using saveAsParquetFile instead of inserting
directly into a hive table (you could still register this as an external
table afterwards).  Right now inserting into Hive tables is going to
through their SerDe instead of our native parquet code, so we have less
control over what is happening.  If you go down the saveAsParquetFile route
you might try repartition to increase the number of partitions (and thus
decrease the amount of data buffered per partition).

On Tue, Sep 23, 2014 at 1:36 PM, Dan Dietterich <
dan_dietter...@yahoo.com.invalid> wrote:

> I am trying to load data from csv format into parquet using Spark SQL.
> It consistently runs out of memory.
>
> The environment is:
>
>    - standalone cluster using HDFS and Hive metastore from HDP2.0
>    - spark1.1.0
>    - parquet jar files (v1.5) explicitly added when starting spark-sql.
>    - 20 workers - ec2 r3.large - set with SPARK_DAEMON_MEMORY of 10g
>    - 1 master - ec2 r3xlarge
>
>
> The input is split across 12 files:
> hdfs dfs -ls /tpcds/fcsv/catalog_returns
> Found 12 items
> -rw-r--r--   3 spark hdfs  282305091 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000000_0
> -rw-r--r--   3 spark hdfs  282037998 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000001_0
> -rw-r--r--   3 spark hdfs  276284419 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000002_0
> -rw-r--r--   3 spark hdfs  269675703 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000003_0
> -rw-r--r--   3 spark hdfs  269673166 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000004_0
> -rw-r--r--   3 spark hdfs  269678197 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000005_0
> -rw-r--r--   3 spark hdfs  153478133 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000006_0
> -rw-r--r--   3 spark hdfs  147586385 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000007_0
> -rw-r--r--   3 spark hdfs  147542545 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000008_0
> -rw-r--r--   3 spark hdfs  141161085 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000009_0
> -rw-r--r--   3 spark hdfs   12110104 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000010_0
> -rw-r--r--   3 spark hdfs    6374442 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000011_0
>
> The failure stack from spark-sql is this:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage
> 0.0 (TID 1, localhost): java.lang.OutOfMemoryError: Java heap space
>
> parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:97)
>
> parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:124)
>
> parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:146)
>
> parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:308)
>
> parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:233)
>
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:84)
>
> parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119)
>
> parquet.column.impl.ColumnWriterImpl.accountForValueWritten(ColumnWriterImpl.java:108)
>
> parquet.column.impl.ColumnWriterImpl.write(ColumnWriterImpl.java:148)
>
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:306)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:133)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeData(DataWritableWriter.java:75)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:55)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
>
> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
>
> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
>
> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
>
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:77)
>
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:90)
>
> org.apache.spark.sql.hive.SparkHiveHadoopWriter.write(SparkHadoopWriter.scala:98)
>         org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org
> $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:151)
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
>
> Am I missing something? Is this a case of "wrong tool for the job"?
>
> Regards,
> dd
>

Reply via email to