Aniket,

The solution was to add a sort so that only one file is written at a time,
which minimizes the memory footprint of columnar formats like Parquet.
That's been released for quite a while, so memory issues caused by Parquet
are more rare now. If you're using Parquet default settings and a recent
Spark version, you should be fine.

rb

On Sun, Nov 20, 2016 at 3:35 AM, Aniket <aniket.bhatna...@gmail.com> wrote:

> Was anyone able  find a solution or recommended conf for this? I am
> running into the same "java.lang.OutOfMemoryError: Direct buffer memory"
> but during snappy compression.
>
> Thanks,
> Aniket
>
> On Tue, Sep 23, 2014 at 7:04 PM Aaron Davidson [via Apache Spark
> Developers List] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=19965&i=0>> wrote:
>
>> This may be related: https://github.com/Parquet/parquet-mr/issues/211
>>
>> Perhaps if we change our configuration settings for Parquet it would get
>> better, but the performance characteristics of Snappy are pretty bad here
>> under some circumstances.
>>
>> On Tue, Sep 23, 2014 at 10:13 AM, Cody Koeninger <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8528&i=0>> wrote:
>>
>> > Cool, that's pretty much what I was thinking as far as configuration
>> goes.
>> >
>> > Running on Mesos.  Worker nodes are amazon xlarge, so 4 core / 15g.
>> I've
>> > tried executor memory sizes as high as 6G
>> > Default hdfs block size 64m, about 25G of total data written by a job
>> with
>> > 128 partitions.  The exception comes when trying to read the data (all
>> > columns).
>> >
>> > Schema looks like this:
>> >
>> > case class A(
>> >   a: Long,
>> >   b: Long,
>> >   c: Byte,
>> >   d: Option[Long],
>> >   e: Option[Long],
>> >   f: Option[Long],
>> >   g: Option[Long],
>> >   h: Option[Int],
>> >   i: Long,
>> >   j: Option[Int],
>> >   k: Seq[Int],
>> >   l: Seq[Int],
>> >   m: Seq[Int]
>> > )
>> >
>> > We're just going back to gzip for now, but might be nice to help
>> someone
>> > else avoid running into this.
>> >
>> > On Tue, Sep 23, 2014 at 11:18 AM, Michael Armbrust <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8528&i=1>
>> > >
>> > wrote:
>> >
>> > > I actually submitted a patch to do this yesterday:
>> > > https://github.com/apache/spark/pull/2493
>> > >
>> > > Can you tell us more about your configuration.  In particular how
>> much
>> > > memory/cores do the executors have and what does the schema of your
>> data
>> > > look like?
>> > >
>> > > On Tue, Sep 23, 2014 at 7:39 AM, Cody Koeninger <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8528&i=2>>
>> > > wrote:
>> > >
>> > >> So as a related question, is there any reason the settings in
>> SQLConf
>> > >> aren't read from the spark context's conf?  I understand why the sql
>> > conf
>> > >> is mutable, but it's not particularly user friendly to have most
>> spark
>> > >> configuration set via e.g. defaults.conf or --properties-file, but
>> for
>> > >> spark sql to ignore those.
>> > >>
>> > >> On Mon, Sep 22, 2014 at 4:34 PM, Cody Koeninger <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8528&i=3>>
>> > >> wrote:
>> > >>
>> > >> > After commit 8856c3d8 switched from gzip to snappy as default
>> parquet
>> > >> > compression codec, I'm seeing the following when trying to read
>> > parquet
>> > >> > files saved using the new default (same schema and roughly same
>> size
>> > as
>> > >> > files that were previously working):
>> > >> >
>> > >> > java.lang.OutOfMemoryError: Direct buffer memory
>> > >> >         java.nio.Bits.reserveMemory(Bits.java:658)
>> > >> >         java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>>
>> > >> >         java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>> > >> >
>> > >> >
>> > >>
>> > parquet.hadoop.codec.SnappyDecompressor.setInput(SnappyDecompressor.java:99)
>>
>> > >> >
>> > >> >
>> > >>
>> > parquet.hadoop.codec.NonBlockedDecompressorStream.read(
>> NonBlockedDecompressorStream.java:43)
>> > >> >         java.io.DataInputStream.readFully(DataInputStream.java:195)
>>
>> > >> >         java.io.DataInputStream.readFully(DataInputStream.java:169)
>>
>> > >> >
>> > >> >
>> > >>
>> > parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:201)
>>
>> > >> >
>> > >> >
>> > parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:521)
>>
>> > >> >
>> > >> >
>> > >>
>> > parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:493)
>>
>> > >> >
>> > >> >
>> > parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:546)
>>
>> > >> >
>> > >> > parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:339)
>>
>> > >> >
>> > >> >
>> > >>
>> > parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:63)
>>
>> > >> >
>> > >> >
>> > >>
>> > parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:58)
>>
>> > >> >
>> > >> >
>> > >>
>> > parquet.io.RecordReaderImplementation.<init>(
>> RecordReaderImplementation.java:265)
>> > >> >
>> > >>  parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60)
>>
>> > >> >
>> > >>  parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74)
>>
>> > >> >
>> > >> >
>> > >>
>> > parquet.hadoop.InternalParquetRecordReader.checkRead(
>> InternalParquetRecordReader.java:110)
>> > >> >
>> > >> >
>> > >>
>> > parquet.hadoop.InternalParquetRecordReader.nextKeyValue(
>> InternalParquetRecordReader.java:172)
>> > >> >
>> > >> >
>> > >>
>> > parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
>>
>> > >> >
>> > >> >
>> > >>
>> > org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139)
>>
>> > >> >
>> > >> >
>> > >>
>> > org.apache.spark.InterruptibleIterator.hasNext(
>> InterruptibleIterator.scala:39)
>> > >> >         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>
>> > >> >         scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>
>> > >> >         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>
>> > >> >         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>
>> > >> >         scala.collection.Iterator$class.isEmpty(Iterator.scala:256)
>>
>> > >> >         scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157)
>>
>> > >> >
>> > >> >
>> > >>
>> > org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.
>> apply(basicOperators.scala:220)
>> > >> >
>> > >> >
>> > >>
>> > org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.
>> apply(basicOperators.scala:219)
>> > >> >         org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
>> > >> >         org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
>> > >> >
>> > >> >
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>
>> > >> >
>> >  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> > >> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> > >> >
>> > >>  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>> > >> >         org.apache.spark.scheduler.Task.run(Task.scala:54)
>> > >> >
>> > >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
>>
>> > >> >
>> > >> >
>> > >>
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> > >> >
>> > >> >
>> > >>
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>> > >> >         java.lang.Thread.run(Thread.java:722)
>> > >> >
>> > >> >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-spark-developers-list.1001551.n3.
>> nabble.com/OutOfMemoryError-on-parquet-SnappyDecompressor-
>> tp8517p8528.html
>> To start a new topic under Apache Spark Developers List, email [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=19965&i=1>
>> To unsubscribe from Apache Spark Developers List, click here.
>> NAML
>> <http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
> ------------------------------
> View this message in context: Re: OutOfMemoryError on parquet
> SnappyDecompressor
> <http://apache-spark-developers-list.1001551.n3.nabble.com/OutOfMemoryError-on-parquet-SnappyDecompressor-tp8517p19965.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>



-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to