I would recommend caching; if you can't persist, iterative algorithms will
not work well.

I don't think calling count on the dataset is problematic; every iteration
in LBFGS iterates over the whole dataset and does a lot more computation
than count().

It would be helpful to see some error occurring within LBFGS.  With the
given stack trace, I'm not sure what part of LBFGS it's happening in.

On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres <
gsala...@ime.usp.br> wrote:

> Yeah, I can call count before that and it works. Also I was over caching
> tables but I removed those. Now there is no caching but it gets really slow
> since it calculates my table RDD many times.
> Also hacked the LBFGS code to pass the number of examples which I
> calculated outside in a Spark SQL query but just moved the location of the
> problem.
>
> The query I'm running looks like this:
>
> s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB  ON
> tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "
>
> mappedFields contains a list of fields which I'm interested in. The result
> of that query goes through (including sampling) some transformations before
> being input to LBFGS.
>
> My dataset has 180GB just for feature selection, I'm planning to use 450GB
> to train the final model and I'm using 16 c3.2xlarge EC2 instances, that
> means I have 240GB of RAM available.
>
> Any suggestion? I'm starting to check the algorithm because I don't
> understand why it needs to count the dataset.
>
> Thanks
>
> Gustavo
>
> On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jos...@databricks.com>
> wrote:
>
>> Is that error actually occurring in LBFGS?  It looks like it might be
>> happening before the data even gets to LBFGS.  (Perhaps the outer join
>> you're trying to do is making the dataset size explode a bit.)  Are you
>> able to call count() (or any RDD action) on the data before you pass it to
>> LBFGS?
>>
>> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <
>> gsala...@ime.usp.br> wrote:
>>
>>> Just did with the same error.
>>> I think the problem is the "data.count()" call in LBFGS because for huge
>>> datasets that's naive to do.
>>> I was thinking to write my version of LBFGS but instead of doing
>>> data.count() I will pass that parameter which I will calculate from a Spark
>>> SQL query.
>>>
>>> I will let you know.
>>>
>>> Thanks
>>>
>>>
>>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Can you try increasing your driver memory, reducing the executors and
>>>> increasing the executor memory?
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
>>>> gsala...@ime.usp.br> wrote:
>>>>
>>>>> Hi there:
>>>>>
>>>>> I'm using LBFGS optimizer to train a logistic regression model. The
>>>>> code I implemented follows the pattern showed in
>>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
>>>>> training data is obtained from a Spark SQL RDD.
>>>>> The problem I'm having is that LBFGS tries to count the elements in my
>>>>> RDD and that results in a OOM exception since my dataset is huge.
>>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on
>>>>> Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of 
>>>>> the
>>>>> data) it in order to scale logistic regression.
>>>>> The exception I'm getting is this:
>>>>>
>>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>         at java.lang.String.<init>(String.java:203)
>>>>>         at
>>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>         at
>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>>>         at
>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>         at
>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>         at
>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>         at
>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>         at
>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>         at org.apache.spark.sql.execution.joins.HashOuterJoin.org
>>>>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>>>         at
>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>>>         at
>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>>>         at
>>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>
>>>>> I'm using this parameters at runtime:
>>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>>>> --conf spark.storage.memoryFraction=0.2
>>>>>
>>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>>>> error.
>>>>> I will appreciate any help on this problem. I have been trying to
>>>>> solve it for days and I'm running out of time and hair.
>>>>>
>>>>> Thanks
>>>>> Gustavo
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to