Aniket, The solution was to add a sort so that only one file is written at a time, which minimizes the memory footprint of columnar formats like Parquet. That's been released for quite a while, so memory issues caused by Parquet are more rare now. If you're using Parquet default settings and a recent Spark version, you should be fine.
rb On Sun, Nov 20, 2016 at 3:35 AM, Aniket <aniket.bhatna...@gmail.com> wrote: > Was anyone able find a solution or recommended conf for this? I am > running into the same "java.lang.OutOfMemoryError: Direct buffer memory" > but during snappy compression. > > Thanks, > Aniket > > On Tue, Sep 23, 2014 at 7:04 PM Aaron Davidson [via Apache Spark > Developers List] <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=19965&i=0>> wrote: > >> This may be related: https://github.com/Parquet/parquet-mr/issues/211 >> >> Perhaps if we change our configuration settings for Parquet it would get >> better, but the performance characteristics of Snappy are pretty bad here >> under some circumstances. >> >> On Tue, Sep 23, 2014 at 10:13 AM, Cody Koeninger <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=8528&i=0>> wrote: >> >> > Cool, that's pretty much what I was thinking as far as configuration >> goes. >> > >> > Running on Mesos. Worker nodes are amazon xlarge, so 4 core / 15g. >> I've >> > tried executor memory sizes as high as 6G >> > Default hdfs block size 64m, about 25G of total data written by a job >> with >> > 128 partitions. The exception comes when trying to read the data (all >> > columns). >> > >> > Schema looks like this: >> > >> > case class A( >> > a: Long, >> > b: Long, >> > c: Byte, >> > d: Option[Long], >> > e: Option[Long], >> > f: Option[Long], >> > g: Option[Long], >> > h: Option[Int], >> > i: Long, >> > j: Option[Int], >> > k: Seq[Int], >> > l: Seq[Int], >> > m: Seq[Int] >> > ) >> > >> > We're just going back to gzip for now, but might be nice to help >> someone >> > else avoid running into this. >> > >> > On Tue, Sep 23, 2014 at 11:18 AM, Michael Armbrust <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=8528&i=1> >> > > >> > wrote: >> > >> > > I actually submitted a patch to do this yesterday: >> > > https://github.com/apache/spark/pull/2493 >> > > >> > > Can you tell us more about your configuration. In particular how >> much >> > > memory/cores do the executors have and what does the schema of your >> data >> > > look like? >> > > >> > > On Tue, Sep 23, 2014 at 7:39 AM, Cody Koeninger <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=8528&i=2>> >> > > wrote: >> > > >> > >> So as a related question, is there any reason the settings in >> SQLConf >> > >> aren't read from the spark context's conf? I understand why the sql >> > conf >> > >> is mutable, but it's not particularly user friendly to have most >> spark >> > >> configuration set via e.g. defaults.conf or --properties-file, but >> for >> > >> spark sql to ignore those. >> > >> >> > >> On Mon, Sep 22, 2014 at 4:34 PM, Cody Koeninger <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=8528&i=3>> >> > >> wrote: >> > >> >> > >> > After commit 8856c3d8 switched from gzip to snappy as default >> parquet >> > >> > compression codec, I'm seeing the following when trying to read >> > parquet >> > >> > files saved using the new default (same schema and roughly same >> size >> > as >> > >> > files that were previously working): >> > >> > >> > >> > java.lang.OutOfMemoryError: Direct buffer memory >> > >> > java.nio.Bits.reserveMemory(Bits.java:658) >> > >> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) >> >> > >> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) >> > >> > >> > >> > >> > >> >> > parquet.hadoop.codec.SnappyDecompressor.setInput(SnappyDecompressor.java:99) >> >> > >> > >> > >> > >> > >> >> > parquet.hadoop.codec.NonBlockedDecompressorStream.read( >> NonBlockedDecompressorStream.java:43) >> > >> > java.io.DataInputStream.readFully(DataInputStream.java:195) >> >> > >> > java.io.DataInputStream.readFully(DataInputStream.java:169) >> >> > >> > >> > >> > >> > >> >> > parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:201) >> >> > >> > >> > >> > >> > parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:521) >> >> > >> > >> > >> > >> > >> >> > parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:493) >> >> > >> > >> > >> > >> > parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:546) >> >> > >> > >> > >> > parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:339) >> >> > >> > >> > >> > >> > >> >> > parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:63) >> >> > >> > >> > >> > >> > >> >> > parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:58) >> >> > >> > >> > >> > >> > >> >> > parquet.io.RecordReaderImplementation.<init>( >> RecordReaderImplementation.java:265) >> > >> > >> > >> parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) >> >> > >> > >> > >> parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) >> >> > >> > >> > >> > >> > >> >> > parquet.hadoop.InternalParquetRecordReader.checkRead( >> InternalParquetRecordReader.java:110) >> > >> > >> > >> > >> > >> >> > parquet.hadoop.InternalParquetRecordReader.nextKeyValue( >> InternalParquetRecordReader.java:172) >> > >> > >> > >> > >> > >> >> > parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) >> >> > >> > >> > >> > >> > >> >> > org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139) >> >> > >> > >> > >> > >> > >> >> > org.apache.spark.InterruptibleIterator.hasNext( >> InterruptibleIterator.scala:39) >> > >> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> >> > >> > scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) >> >> > >> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> >> > >> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> >> > >> > scala.collection.Iterator$class.isEmpty(Iterator.scala:256) >> >> > >> > scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157) >> >> > >> > >> > >> > >> > >> >> > org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1. >> apply(basicOperators.scala:220) >> > >> > >> > >> > >> > >> >> > org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1. >> apply(basicOperators.scala:219) >> > >> > org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) >> > >> > org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) >> > >> > >> > >> > >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> >> > >> > >> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> > >> > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> > >> > >> > >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >> > >> > org.apache.spark.scheduler.Task.run(Task.scala:54) >> > >> > >> > >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) >> >> > >> > >> > >> > >> > >> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> > >> > >> > >> > >> > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> > >> > java.lang.Thread.run(Thread.java:722) >> > >> > >> > >> > >> > >> > >> > >> >> > > >> > > >> > >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-spark-developers-list.1001551.n3. >> nabble.com/OutOfMemoryError-on-parquet-SnappyDecompressor- >> tp8517p8528.html >> To start a new topic under Apache Spark Developers List, email [hidden >> email] <http:///user/SendEmail.jtp?type=node&node=19965&i=1> >> To unsubscribe from Apache Spark Developers List, click here. >> NAML >> <http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > ------------------------------ > View this message in context: Re: OutOfMemoryError on parquet > SnappyDecompressor > <http://apache-spark-developers-list.1001551.n3.nabble.com/OutOfMemoryError-on-parquet-SnappyDecompressor-tp8517p19965.html> > Sent from the Apache Spark Developers List mailing list archive > <http://apache-spark-developers-list.1001551.n3.nabble.com/> at > Nabble.com. > -- Ryan Blue Software Engineer Netflix