I raised driver memory to 30G and maxresultsize to 25G, this time in pyspark. 

Code run:

cat_int  = ['bigfeature']

stagesIndex = []
stagesOhe   = []
for c in cat_int:
  stagesIndex.append(StringIndexer(inputCol=c, outputCol="{}Index".format(c)))
  stagesOhe.append(OneHotEncoder(dropLast= False, inputCol = 
"{}Index".format(c), outputCol = "{}OHE".format(c)))

df2 = df

for i in range(len(stagesIndex)):
  logging.info("Starting with {}".format(cat_int[i]))
  stagesIndex[i].fit(df2)
  logging.info("Fitted. Now transforming:")
  df2 = stagesIndex[i].fit(df2).transform(df2)
  logging.info("Transformed. Now showing transformed:")
  df2.show()
  logging.info("OHE")
  df2 = stagesOhe[i].transform(df2)
  logging.info("Fitted. Now showing OHE:")
  df2.show()

Now I get error:

2016-08-04 08:53:44,839 INFO       Starting with bigfeature                   
[57/7074]
ukStringIndexer_442b8e11e3294de9b83a
2016-08-04 09:06:18,147 INFO       Fitted. Now transforming:
16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 - Cannot 
receive any reply in 120 seconds. This timeout is controlled by 
spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 
seconds. This timeout is controlled by spark.rpc.askTimeout
        at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
        at scala.util.Try$.apply(Try.scala:192)
        at scala.util.Failure.recover(Try.scala:216)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at 
org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
        at 
scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
        at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at scala.concurrent.Promise$class.complete(Promise.scala:55)
        at 
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
        at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
        at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
        at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
        at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at 
scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
        at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
        at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
        at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
        at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
        at 
org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
        at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
                                                                                
 [13/7074]
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 
120 seconds
        ... 8 more
16/08/04 09:10:45 WARN TransportResponseHandler: Ignoring response for RPC 
4858888672840406395 from /10.10.80.4:59931 (47 bytes) since it is not 
outstanding
^[[A^[[5~2016-08-04 09:12:07,016 INFO       Transformed. Now showing 
transformed:
16/08/04 09:13:48 WARN DFSClient: Slow ReadProcessor read fields took 71756ms 
(threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR 
downstreamAckTimeNanos: 0, targets: [10.10.66.5:50010, 10.10.10.12:50010, 
10.10.91.9:50010]
16/08/04 09:13:48 WARN DFSClient: DFSOutputStream ResponseProcessor exception  
for block BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265
java.io.IOException: Bad response ERROR for block 
BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 from 
datanode 10.10.10.12:50010
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
16/08/04 09:13:48 WARN DFSClient: Error Recovery for block 
BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 in 
pipeline 10.10.66.5:50010, 10.10.10.12:50010, 10.192.91.9:50010: bad datanode 
10.10.10.12:50010
Traceback (most recent call last):
  File "<stdin>", line 7, in <module>
  File "/opt/spark/2.0.0/python/pyspark/sql/dataframe.py", line 287, in show
    print(self._jdf.showString(n, truncate))
  File "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", 
line 933, in __call__
  File "/opt/spark/2.0.0/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 
312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o95.showString.
: java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.util.Arrays.copyOf(Arrays.java:2271)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
        at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
        at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
        at 
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)

Ben

> On Aug 3, 2016, at 4:00 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
> 
> Hi,
> 
> I want to one hot encode a column containing 56 million distinct values. My 
> dataset is 800m rows + 17 columns.
> I first apply a StringIndexer, but it already breaks there giving a OOM java 
> heap space error.
> 
> I launch my app on YARN with:
> /opt/spark/2.0.0/bin/spark-shell --executor-memory 10G --num-executors 128 
> --executor-cores 2 --driver-memory 12G --conf spark.driver.maxResultSize=8G
> 
> After grabbing the data, I run:
> 
> val catInts = Array(“bigfeature”)
> 
> val stagesIndex = scala.collection.mutable.ArrayBuffer.empty[StringIndexer]
> val stagesOhe = scala.collection.mutable.ArrayBuffer.empty[OneHotEncoder]
> for (c <- catInts) {
>   println(s"starting with $c")
>   val i = new StringIndexer()
>     .setInputCol(c)
>     .setOutputCol(s"${c}Index")
>   stagesIndex += i
> 
>   val o = new OneHotEncoder()
>     .setDropLast(false)
>     .setInputCol(s"${c}Index")
>     .setOutputCol(s"${c}OHE")
>   stagesOhe += o
> }
> 
> println(s"Applying string indexers: fitting")
> val pipelined = new Pipeline().setStages(stagesIndex.toArray)
> val dfFitted = pipelined.fit(df)
> 
> 
> Then, the application master shows a "countByValue at StringIndexer.scala” 
> taking 1.8 minutes (so very fast). 
> Afterwards, the shell console hangs for a while. What is it doing now? After 
> some time, it shows:
> 
> scala> val dfFitted = pipelined.fit(df)
>                                                                       
> java.lang.OutOfMemoryError: Java heap space
>   at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
>   at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
>   at 
> org.apache.spark.util.collection.OpenHashMap$$anonfun$1.apply$mcVI$sp(OpenHashMap.scala:159)
>   at 
> org.apache.spark.util.collection.OpenHashSet.rehash(OpenHashSet.scala:230)
>   at 
> org.apache.spark.util.collection.OpenHashSet.rehashIfNeeded(OpenHashSet.scala:167)
>   at 
> org.apache.spark.util.collection.OpenHashMap$mcD$sp.update$mcD$sp(OpenHashMap.scala:86)
>   at 
> org.apache.spark.ml.feature.StringIndexerModel.<init>(StringIndexer.scala:137)
>   at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:93)
>   at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66)
>   at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
>   at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
>   at 
> scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
>   at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145)
>   ... 16 elided
> 
> 
> 

Reply via email to