I increased the memory limits , now it works fine. Thanks for the help.
On 18 June 2015 at 04:01, Cheng Lian <lian.cs....@gmail.com> wrote: > Does increasing executor memory fix the memory problem? > > How many columns does the schema contain? Parquet can be super memory > consuming when writing wide tables. > > Cheng > > > On 6/15/15 5:48 AM, Bipin Nag wrote: > > HI Davies, > > I have tried recent 1.4 and 1.5-snapshot to 1) open the parquet and save > it again or 2 apply schema to rdd and save dataframe as parquet but now I > get this error (right in the beginning): > > java.lang.OutOfMemoryError: Java heap space > at > parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) > at > parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57) > at > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68) > at > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48) > at > parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) > at > parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) > at > parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) > at > parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178) > at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) > at > parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) > at > parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94) > at > parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) > at > org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111) > at > org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) > at > org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:386) > at > org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:298) > at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org > $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:142) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded > at parquet.io.api.Binary.fromByteArray(Binary.java:159) > at > parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:94) > at > parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:67) > at parquet.column.Encoding$4.initDictionary(Encoding.java:131) > at > parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:325) > at > parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:63) > at > parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:58) > at > parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267) > at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) > at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) > at > parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) > at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) > at > parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) > at > parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) > at > parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) > at > org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:163) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org > $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152) > > I am not sure if this is related to your patch or some other bug. My > error doesn't show up in newer versions, so this is the problem to fix now. > > Thanks > > On 13 June 2015 at 06:31, Davies Liu <dav...@databricks.com> wrote: > >> Maybe it's related to a bug, which is fixed by >> https://github.com/apache/spark/pull/6558 recently. >> >> On Fri, Jun 12, 2015 at 5:38 AM, Bipin Nag <bipin....@gmail.com> wrote: >> > Hi Cheng, >> > >> > Yes, some rows contain unit instead of decimal values. I believe some >> rows >> > from original source I had don't have any value i.e. it is null. And >> that >> > shows up as unit. How does the spark-sql or parquet handle null in >> place of >> > decimal values, assuming that field is nullable. I will have to change >> it >> > properly. >> > >> > Thanks for helping out. >> > Bipin >> > >> > On 12 June 2015 at 14:57, Cheng Lian <lian.cs....@gmail.com> wrote: >> >> >> >> On 6/10/15 8:53 PM, Bipin Nag wrote: >> >> >> >> Hi Cheng, >> >> >> >> I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an >> >> existing parquet file, then repartitioning and saving it. Doing this >> gives >> >> the error. The code for this doesn't look like causing problem. I >> have a >> >> feeling the source - the existing parquet is the culprit. >> >> >> >> I created that parquet using a jdbcrdd (pulled from microsoft sql >> server). >> >> First I saved jdbcrdd as an objectfile on disk. Then loaded it again, >> made a >> >> dataframe from it using a schema then saved it as a parquet. >> >> >> >> Following is the code : >> >> For saving jdbcrdd: >> >> name - fullqualifiedtablename >> >> pk - string for primarykey >> >> pklast - last id to pull >> >> val myRDD = new JdbcRDD( sc, () => >> >> DriverManager.getConnection(url,username,password) , >> >> "SELECT * FROM " + name + " WITH (NOLOCK) WHERE ? <= "+pk+" and >> >> "+pk+" <= ?", >> >> 1, lastpk, 1, JdbcRDD.resultSetToObjectArray) >> >> myRDD.saveAsObjectFile("rawdata/"+name); >> >> >> >> For applying schema and saving the parquet: >> >> val myschema = schemamap(name) >> >> val myrdd = >> >> sc.objectFile[Array[Object]]("/home/bipin/rawdata/"+name).map(x => >> >> org.apache.spark.sql.Row(x:_*)) >> >> >> >> Have you tried to print out x here to check its contents? My guess is >> that >> >> x actually contains unit values. For example, the follow Spark shell >> code >> >> can reproduce a similar exception: >> >> >> >> import org.apache.spark.sql.types._ >> >> import org.apache.spark.sql.Row >> >> >> >> val schema = StructType(StructField("dec", DecimalType(10, 0)) :: Nil) >> >> val rdd = sc.parallelize(1 to 10).map(_ => Array(())).map(arr => >> Row(arr: >> >> _*)) >> >> val df = sqlContext.createDataFrame(rdd, schema) >> >> >> >> df.saveAsParquetFile("file:///tmp/foo") >> >> >> >> val actualdata = sqlContext.createDataFrame(myrdd, myschema) >> >> actualdata.saveAsParquetFile("/home/bipin/stageddata/"+name) >> >> >> >> Schema structtype can be made manually, though I pull table's metadata >> and >> >> make one. It is a simple string translation (see sql docs and/or spark >> >> datatypes) >> >> >> >> That is how I created the parquet file. Any help to solve the issue is >> >> appreciated. >> >> Thanks >> >> Bipin >> >> >> >> >> >> On 9 June 2015 at 20:44, Cheng Lian <lian.cs....@gmail.com> wrote: >> >>> >> >>> Would you please provide a snippet that reproduce this issue? What >> >>> version of Spark were you using? >> >>> >> >>> Cheng >> >>> >> >>> On 6/9/15 8:18 PM, bipin wrote: >> >>>> >> >>>> Hi, >> >>>> When I try to save my data frame as a parquet file I get the >> following >> >>>> error: >> >>>> >> >>>> java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast >> to >> >>>> org.apache.spark.sql.types.Decimal >> >>>> at >> >>>> >> >>>> >> org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220) >> >>>> at >> >>>> >> >>>> >> org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192) >> >>>> at >> >>>> >> >>>> >> org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171) >> >>>> at >> >>>> >> >>>> >> org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134) >> >>>> at >> >>>> >> >>>> >> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) >> >>>> at >> >>>> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) >> >>>> at >> >>>> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) >> >>>> at >> >>>> >> >>>> org.apache.spark.sql.parquet.ParquetRelation2.org >> $apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671) >> >>>> at >> >>>> >> >>>> >> org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689) >> >>>> at >> >>>> >> >>>> >> org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689) >> >>>> at >> >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> >>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> >>>> at >> >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> >>>> at >> >>>> >> >>>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >>>> at >> >>>> >> >>>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >>>> at java.lang.Thread.run(Thread.java:745) >> >>>> >> >>>> How to fix this problem ? >> >>>> >> >>>> >> >>>> >> >>>> -- >> >>>> View this message in context: >> >>>> >> http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html >> >>>> Sent from the Apache Spark User List mailing list archive at >> Nabble.com. >> >>>> >> >>>> --------------------------------------------------------------------- >> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >>>> For additional commands, e-mail: user-h...@spark.apache.org >> >>>> >> >>>> >> >>> >> >> >> > >> > > >